bool TickCursorImpl::readNextMessageInBlock()

in src/dxapi/native/tickdb/http/tickcursor_http.cpp [555:667]


bool TickCursorImpl::readNextMessageInBlock(MessageHeader &msg)
{
    DataReaderBaseImpl &reader = *this->reader_;

    // We should never ever enter readNextMessage() if reset mode is activated

    assert(isWithinMessageBlock_);
    while (1) {
        unsigned messageSize = reader.getUInt32();
        if ((unsigned)MESSAGE_BLOCK_TERMINATOR_RECORD == messageSize) {
            isWithinMessageBlock_ = false;
            return false;
        }

        msg.length = messageSize;
        reader.setMessageSize(messageSize);

        if (!in_range(messageSize, (unsigned)CURSOR_MESSAGE_HEADER_SIZE_EID32, (unsigned)MAX_MESSAGE_SIZE + 1)) {
            THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): Incorrect message size: %u", ID, messageSize);
        }

        // read: timestamp instrument_index type_index body
        lastTimestamp_ = msg.timestamp = reader.getInt64();

        unsigned currentEntityId = readEntityId<false>(reader);
        msg.entityId = currentEntityId;

        unsigned currentDescriptorId = reader.getByte();
        msg.typeId = currentDescriptorId;

        // This will convert 255 to -1, all other byte values are left unchanged
        intptr_t currentStreamId = (intptr_t)((reader.getByte() + 1U) & 0xFF) - 1;
        msg.streamId = (int32_t)currentStreamId;

        //DBGLOG("msg.entityId=%u", currentEntityId);

        // At this point the whole header is read from the input stream

        if (shadowCopyUpdated_) {
            srw_write section(thisLock_); // TODO: srw_read?
                
            assert(shadowState_ != mainState_);
            mainState_ = shadowState_;
            shadowCopyUpdated_ = false; // TODO: combined with flag?

            SubscriptionState &s = *mainState_;
            if (s.readingRestarted_) {
                s.readingRestarted_ = false;
                setState(CursorState::STARTED);
            }
        }

        const SubscriptionState &s = *mainState_;

        if (!s.tables_.isRegisteredInstrumentId(currentEntityId)) {
            THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): unexpected EntityIndex = %u", ID, currentEntityId);
        }

        if (!s.tables_.isRegisteredMsgDescId(currentDescriptorId)) {
            THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): unexpected MessageDescriptorIndex = %u", ID, currentDescriptorId);
        }

        // Special code to consider currectStreamId = -1
        if (currentStreamId >= (intptr_t)s.tables_.streamKeys_.size()) {
            THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): unexpected StreamIndex = %u", ID, (byte)currentStreamId);
        }

        //unsigned currentUserTypeIndex = s.tables_.messageTypeRemote2Local[currentTypeId];
        //msg.typeIdLocal = currentUserTypeIndex;
        msg.typeIdLocal = 0;        
        
        ++stats.nMessagesProcessed;
        // TODO: use arithmetic OR for speed

        // Everything is skipped during Reset
        if (s.isResetInProgress_)
            goto skip;

        // System messages are always allowed, unless Reset
        // The rest is filtered by instrument and type
        if (!isSystemMessage_[currentDescriptorId] &&
            (s.isMessageDescriptorSkipped_.get(currentDescriptorId) || s.isSymbolSkipped_.get(currentEntityId)))
            goto skip;

#if defined(_DEBUG) && VERBOSE_CURSOR >= 1
        if (isSystemMessage_[currentDescriptorId]) {
            DBGLOG(LOGHDR ": %s system msg received. Stats: %s", ID, getMessageTypeName(currentDescriptorId)->c_str(), getStatsUpdate(true).c_str());

            /*if (NULL != strstr(getMessageTypeName(currentDescriptorId)->c_str(), "RealTimeStartMessage")) {
            }*/
        }
#endif

#if VERBOSE_CURSOR >= 2
        dbg_dumpMessage(msg, "msg");
#endif
        
        msg.cursorState = setState(CursorState::READING); // TODO: Better solution later, no time
        
        return true;

    skip:
#if VERBOSE_CURSOR >= 2
        dbg_dumpMessage(msg, "skip");
#endif

//#if VERBOSE_CURSOR_FILTERS >= 2
//        DBGLOG(LOGHDR ": msg skipped by filter", ID);
//#endif
        ++stats.nMessagesSkipped;
        reader_->nextMessage();
    }
}