void SessionHandler::threadProcImpl()

in src/dxapi/native/tickdb/session_handler.cpp [588:654]


void SessionHandler::threadProcImpl()
{
    DBGLOG(LOGHDR ": sessionHandler started!", ID);

    static_assert(PING_PERIOD_NS > IDLE_YIELD_DELAY_NS, "Ping period must be greater than thread yield/sleep period");

    auto lastDataTimestamp = (int64_t)time_ns();
    auto lastLoggedTimestamp = lastDataTimestamp;
    auto lastPingTimestamp = lastDataTimestamp;
    int64_t logPeriod = INT64_C(5000000000);
    int64_t pingPeriod = PING_PERIOD_NS;

    while (!shouldStop_) {
        auto result = processInput();
        if (FINISHED == result)
            break;

        int64 t = time_ns();
        if (OK == result) {
            lastPingTimestamp = lastDataTimestamp = lastLoggedTimestamp = time_ns();
        }
        else {
            /*if (t - lastDataTimestamp > IDLE_UNLOCK_TIMEOUT_NS && knownStreamDefs.size() != 0) {
            knownStreamDefs.clear();
            }*/

            if (t - lastDataTimestamp > IDLE_YIELD_DELAY_NS) {
                if (t - lastLoggedTimestamp > logPeriod) {
                    lastLoggedTimestamp += logPeriod;
                    DBGLOG_VERBOSE(LOGHDR ": idle", ID);
                }

                // Ping disabled for now
#if 0
                if (t - lastPingTimestamp > pingPeriod) {
                    lastPingTimestamp += pingPeriod;
                    requestGetStream("__PING_u5u7clwetyjui__");
                }
#endif

                this_thread::yield();
            }

        }

        if (0 != neededStreamDefs.size()) {
            concurrent_obj_set_iter<string> iter(neededStreamDefs);
            string key;
            vector<string> keys;
            string logStr;
            DBGLOG_VERBOSE(LOGHDR ": %u stream(s) need to retrieve content", ID, (unsigned)neededStreamDefs.size());

            while (iter.next(key)) {
                keys.push_back(key);
                logStr.append(key).append(" ");
                neededStreamDefs.remove(key);
            }

            DBGLOG(LOGHDR ": requesting stream contents for %u streams: %s", ID, (unsigned)keys.size(), logStr.c_str());
            requestGetStreams(keys);
        }
    }

    DBGLOG_VERBOSE(LOGHDR ": sessionHandler read loop stopped", ID);
    close();
    DBGLOG(LOGHDR ": sessionHandler thread finished!", ID);
}