void SessionHandler::getStreamsSynchronous()

in src/dxapi/native/tickdb/session_handler.cpp [1066:1118]


void SessionHandler::getStreamsSynchronous(const std::vector<std::string> &keys)
{
    if (isWorking()) {
        concurrent_obj_set<string> requestedLocal; // TODO: No need for concurrency actually. Will correct later

        if (0 == keys.size()) {
            knownStreamDefs.clear();
        }
        else {
            for (auto &i : keys) {
                knownStreamDefs.remove(i);
                requestedLocal.add(i);
            }
        }

        uint64_t requestId = requestGetStreams(keys);

        DBGLOG_VERBOSE(LOGHDR ".getStreamsSync(): waiting for #%llu", ID, (ulonglong)requestId);
        while (!streamDefRequestCompleted(requestId)) {
            this_thread::yield();
        }

        DBGLOG_VERBOSE(LOGHDR ".getStreamsSync(): wait for #%llu done", ID, (ulonglong)requestId);

#if 0
        // Wait till all requested streams are removed from the request queue.
        // TODO: race condition: someone may re-add those streams later and this is beyond our control
        auto start = time_ns();
        while (time_ns() - start < GET_STREAMS_TIMEOUT_NS) {
            if (0 == keys.size()) {
                if (0 != knownStreamDefs.size()) {
                    break;
                    // TODO: This wont work well with empty timebase.
                }
            }
            else {
                for (auto &i : keys) {
                    if (NULL != requestedLocal.find(i)) {
                        if (NULL != knownStreamDefs.find(i)) {
                            requestedLocal.remove(i);
                        }
                    }
                }

                if (0 == requestedLocal.size())
                    break;
            }

            this_thread::yield();
        }
#endif
    }
}