in src/dxapi/native/tickdb/session_handler.cpp [1066:1118]
void SessionHandler::getStreamsSynchronous(const std::vector<std::string> &keys)
{
if (isWorking()) {
concurrent_obj_set<string> requestedLocal; // TODO: No need for concurrency actually. Will correct later
if (0 == keys.size()) {
knownStreamDefs.clear();
}
else {
for (auto &i : keys) {
knownStreamDefs.remove(i);
requestedLocal.add(i);
}
}
uint64_t requestId = requestGetStreams(keys);
DBGLOG_VERBOSE(LOGHDR ".getStreamsSync(): waiting for #%llu", ID, (ulonglong)requestId);
while (!streamDefRequestCompleted(requestId)) {
this_thread::yield();
}
DBGLOG_VERBOSE(LOGHDR ".getStreamsSync(): wait for #%llu done", ID, (ulonglong)requestId);
#if 0
// Wait till all requested streams are removed from the request queue.
// TODO: race condition: someone may re-add those streams later and this is beyond our control
auto start = time_ns();
while (time_ns() - start < GET_STREAMS_TIMEOUT_NS) {
if (0 == keys.size()) {
if (0 != knownStreamDefs.size()) {
break;
// TODO: This wont work well with empty timebase.
}
}
else {
for (auto &i : keys) {
if (NULL != requestedLocal.find(i)) {
if (NULL != knownStreamDefs.find(i)) {
requestedLocal.remove(i);
}
}
}
if (0 == requestedLocal.size())
break;
}
this_thread::yield();
}
#endif
}
}