in src/dxapi/native/tickdb/session_handler.cpp [588:654]
void SessionHandler::threadProcImpl()
{
DBGLOG(LOGHDR ": sessionHandler started!", ID);
static_assert(PING_PERIOD_NS > IDLE_YIELD_DELAY_NS, "Ping period must be greater than thread yield/sleep period");
auto lastDataTimestamp = (int64_t)time_ns();
auto lastLoggedTimestamp = lastDataTimestamp;
auto lastPingTimestamp = lastDataTimestamp;
int64_t logPeriod = INT64_C(5000000000);
int64_t pingPeriod = PING_PERIOD_NS;
while (!shouldStop_) {
auto result = processInput();
if (FINISHED == result)
break;
int64 t = time_ns();
if (OK == result) {
lastPingTimestamp = lastDataTimestamp = lastLoggedTimestamp = time_ns();
}
else {
/*if (t - lastDataTimestamp > IDLE_UNLOCK_TIMEOUT_NS && knownStreamDefs.size() != 0) {
knownStreamDefs.clear();
}*/
if (t - lastDataTimestamp > IDLE_YIELD_DELAY_NS) {
if (t - lastLoggedTimestamp > logPeriod) {
lastLoggedTimestamp += logPeriod;
DBGLOG_VERBOSE(LOGHDR ": idle", ID);
}
// Ping disabled for now
#if 0
if (t - lastPingTimestamp > pingPeriod) {
lastPingTimestamp += pingPeriod;
requestGetStream("__PING_u5u7clwetyjui__");
}
#endif
this_thread::yield();
}
}
if (0 != neededStreamDefs.size()) {
concurrent_obj_set_iter<string> iter(neededStreamDefs);
string key;
vector<string> keys;
string logStr;
DBGLOG_VERBOSE(LOGHDR ": %u stream(s) need to retrieve content", ID, (unsigned)neededStreamDefs.size());
while (iter.next(key)) {
keys.push_back(key);
logStr.append(key).append(" ");
neededStreamDefs.remove(key);
}
DBGLOG(LOGHDR ": requesting stream contents for %u streams: %s", ID, (unsigned)keys.size(), logStr.c_str());
requestGetStreams(keys);
}
}
DBGLOG_VERBOSE(LOGHDR ": sessionHandler read loop stopped", ID);
close();
DBGLOG(LOGHDR ": sessionHandler thread finished!", ID);
}