bool TickCursorImpl::readNextRecord()

in src/dxapi/native/tickdb/http/tickcursor_http.cpp [695:818]


bool TickCursorImpl::readNextRecord()
{
    byte id = reader_->getByte();
    isIdle_ = false;

    switch (id) {
    case TYPE_BLOCK_ID:
        readMessageType();
        break;

    case INSTRUMENT_BLOCK_ID:
        readEntity();
        break;

    case STREAM_BLOCK_ID:
        readStreamKey();
        break;

    case MESSAGE_BLOCK_ID:
        isWithinMessageBlock_ = true;
        break;

    case ERROR_BLOCK_ID:
    {
        int8 code = reader_->getByte();
        string text;
        reader_->getUTF8(text);
        DBGLOGERR((string *)NULL, LOGHDR ".readNextRecord(): Server-side error %d: %s", ID, (int)code, text.c_str());
        throw TickCursorServerError((int)code, text.c_str());
    }

    case TERMINATOR_BLOCK_ID:
    {
        srw_write section(thisLock_);

        synchronizeCheck();
        SubscriptionState &s = *mainState_;

        if (options.live && !isInterrupted_) {
            DBGLOG(LOGHDR ".readNextRecord(): WARNING: Server just sent 'terminator block id' to live cursor!!!", ID);
        }

        if (s.isExecutingSubscriptions() && !isInterrupted_) {
            DBGLOG_VERBOSE(LOGHDR ": Terminator marker ignored", ID);
            break;
        }

        setState(CursorState::END); // We are under lock already
        finalStats();
        return false;
    }

    case PING_BLOCK_ID:
        DBGLOG_VERBOSE(LOGHDR ": Ping received. Stats: %s", ID, getStatsUpdate(false).c_str());
        isIdle_ = true;
        break;

    case CURSOR_BLOCK_ID:
    {
        int64 id = reader_->getInt64();
        if (cursorId_ != -1 && cursorId_ != id) {
            THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextRecord(): Redundant cursor id record encountered with different id: %lld instead of %lld",
                ID, (longlong)id, (longlong)cursorId_);
        }

        cursorId_ = id;
        DBGLOG(LOGHDR ": Cursor id assigned by server: %lld", ID, (longlong)id);
    }
    break;

    case COMMAND_BLOCK_ID:
    {
        int64 cmdid = reader_->getInt64();
        DBGLOG_VERBOSE(LOGHDR ": cmdid: %lld", ID, (longlong)cmdid);

        {
            static const char* isLastStr[2] = { "", " - all commands done" };
            srw_write section(thisLock_);
            synchronizeCheck();

            SubscriptionState &s = *mainState_;
            assert(s.firstExpectedCommandId_ <= s.lastExpectedCommandId_);
            if (cmdid == s.firstExpectedCommandId_) {
                bool isLast = s.firstExpectedCommandId_ == s.lastExpectedCommandId_;

                do {
                    if (s.isResetInProgress_) {
                        if (cmdid == s.lastResetCommandId_) {
                            s.isResetInProgress_ = false;
                            s.lastResetCommandId_ = -1;
                            DBGLOG(LOGHDR ": Cursor reset completed%s Stats: %s", ID, isLastStr[isLast], getStatsUpdate(true).c_str());
                            break;
                        }
                        else {
                            if (!in_range(s.lastResetCommandId_, s.firstExpectedCommandId_, s.lastExpectedCommandId_ + 1)) {
                                THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextRecord(): Logic error - invalid reset command id during reset operation: (%lld, [%lld, %lld])", ID, (longlong)s.lastResetCommandId_,
                                    (longlong)s.firstExpectedCommandId_, (longlong)s.lastExpectedCommandId_);
                            }
                        }
                    }

                    DBGLOG(LOGHDR ": Cursor subscription change completed%s", ID, isLastStr[isLast]);
                } while (0);


                ++s.firstExpectedCommandId_;
                if (isLast) {
                    s.firstExpectedCommandId_ = s.lastExpectedCommandId_ = -1;
                }
            }
            else {
                THROW_DBGLOG_EX(TickCursorError, LOGHDR ": Unexpected cursor subscription command id. Expected: %lld, Received: %lld ",
                    ID, (longlong)s.firstExpectedCommandId_, (longlong)cmdid);
            }
        }
    }
    break;

    default:
        THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextRecord(): Invalid block marker = %d", ID, (int)id);
    }

    return true;
}