SessionHandler::Result SessionHandler::receive_STREAMS_DEFINITION()

in src/dxapi/native/tickdb/session_handler.cpp [231:312]


SessionHandler::Result SessionHandler::receive_STREAMS_DEFINITION()
{
    uint64 requestId = -1;
    try {
        LoadStreamsRequest::Response newStreams;
        requestId = read<uint64>();
        size_t sz = read<int32_t>();

        XmlInput xml;
        LoadStreamsRequest req(this->db_);

        if (0 == sz)
            goto nothing;

        if (sz > INT32_MAX) {
            THROW_DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: XML size is too big : %lld", ID, (longlong)sz);
        }

        
        xml.resize(sz);
        ioStream_->read(&xml[0], sz, sz);

#ifdef _DEBUG
        DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): finished reading stream metadata: %lld, %08X", ID, (longlong)sz, (unsigned)crc32(xml));
#else
        DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): finished reading stream metadata: %lldb", ID, (longlong)sz);
#endif

        //printf("%d\n%s\n", (unsigned)sz, xml.c_str());

        
        if (!req.getStreams(newStreams, xml)) {
            THROW_DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: unable to parse XML", ID);
        }
        else {
            DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): parsed stream metadata", ID);
        }

        for (auto &i : newStreams) {
            auto s = cache().get(i.key);

            if (NULL == s) {
                db_.addStream(db_.allocateStream(i.key, i.options));
            }
            else {
                impl(s)->updateOptions(i.options);
            }
        }

        // Mark those streams as recently received
        for (auto &i : newStreams) {
            knownStreamDefs.add(i.key);
        }

nothing:
        if (++nStreamDefRequestsReceived_ != requestId) {
            DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: #%llu received, #%llu expected!!!!!",
                ID, (ulonglong)requestId, (ulonglong)nStreamDefRequestsReceived_);
        }
        else {
            if (0 == sz) {
                DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): _empty_ #%llu received", ID, (ulonglong)nStreamDefRequestsReceived_);
            }
            else {
                DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): #%llu received", ID, (ulonglong)nStreamDefRequestsReceived_);
            }
        }

        return OK;
    }
    catch (...) {
        if (++nStreamDefRequestsReceived_ != requestId) {
            DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: #%llu received, #%llu expected!!!!!",
                ID, (ulonglong)requestId, (ulonglong)nStreamDefRequestsReceived_);
        }
        else {
            DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): #%llu received", ID, (ulonglong)nStreamDefRequestsReceived_);
        }

        throw;
    }
}