in src/dxapi/native/tickdb/http/tickcursor_http.cpp [695:818]
bool TickCursorImpl::readNextRecord()
{
byte id = reader_->getByte();
isIdle_ = false;
switch (id) {
case TYPE_BLOCK_ID:
readMessageType();
break;
case INSTRUMENT_BLOCK_ID:
readEntity();
break;
case STREAM_BLOCK_ID:
readStreamKey();
break;
case MESSAGE_BLOCK_ID:
isWithinMessageBlock_ = true;
break;
case ERROR_BLOCK_ID:
{
int8 code = reader_->getByte();
string text;
reader_->getUTF8(text);
DBGLOGERR((string *)NULL, LOGHDR ".readNextRecord(): Server-side error %d: %s", ID, (int)code, text.c_str());
throw TickCursorServerError((int)code, text.c_str());
}
case TERMINATOR_BLOCK_ID:
{
srw_write section(thisLock_);
synchronizeCheck();
SubscriptionState &s = *mainState_;
if (options.live && !isInterrupted_) {
DBGLOG(LOGHDR ".readNextRecord(): WARNING: Server just sent 'terminator block id' to live cursor!!!", ID);
}
if (s.isExecutingSubscriptions() && !isInterrupted_) {
DBGLOG_VERBOSE(LOGHDR ": Terminator marker ignored", ID);
break;
}
setState(CursorState::END); // We are under lock already
finalStats();
return false;
}
case PING_BLOCK_ID:
DBGLOG_VERBOSE(LOGHDR ": Ping received. Stats: %s", ID, getStatsUpdate(false).c_str());
isIdle_ = true;
break;
case CURSOR_BLOCK_ID:
{
int64 id = reader_->getInt64();
if (cursorId_ != -1 && cursorId_ != id) {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextRecord(): Redundant cursor id record encountered with different id: %lld instead of %lld",
ID, (longlong)id, (longlong)cursorId_);
}
cursorId_ = id;
DBGLOG(LOGHDR ": Cursor id assigned by server: %lld", ID, (longlong)id);
}
break;
case COMMAND_BLOCK_ID:
{
int64 cmdid = reader_->getInt64();
DBGLOG_VERBOSE(LOGHDR ": cmdid: %lld", ID, (longlong)cmdid);
{
static const char* isLastStr[2] = { "", " - all commands done" };
srw_write section(thisLock_);
synchronizeCheck();
SubscriptionState &s = *mainState_;
assert(s.firstExpectedCommandId_ <= s.lastExpectedCommandId_);
if (cmdid == s.firstExpectedCommandId_) {
bool isLast = s.firstExpectedCommandId_ == s.lastExpectedCommandId_;
do {
if (s.isResetInProgress_) {
if (cmdid == s.lastResetCommandId_) {
s.isResetInProgress_ = false;
s.lastResetCommandId_ = -1;
DBGLOG(LOGHDR ": Cursor reset completed%s Stats: %s", ID, isLastStr[isLast], getStatsUpdate(true).c_str());
break;
}
else {
if (!in_range(s.lastResetCommandId_, s.firstExpectedCommandId_, s.lastExpectedCommandId_ + 1)) {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextRecord(): Logic error - invalid reset command id during reset operation: (%lld, [%lld, %lld])", ID, (longlong)s.lastResetCommandId_,
(longlong)s.firstExpectedCommandId_, (longlong)s.lastExpectedCommandId_);
}
}
}
DBGLOG(LOGHDR ": Cursor subscription change completed%s", ID, isLastStr[isLast]);
} while (0);
++s.firstExpectedCommandId_;
if (isLast) {
s.firstExpectedCommandId_ = s.lastExpectedCommandId_ = -1;
}
}
else {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ": Unexpected cursor subscription command id. Expected: %lld, Received: %lld ",
ID, (longlong)s.firstExpectedCommandId_, (longlong)cmdid);
}
}
}
break;
default:
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextRecord(): Invalid block marker = %d", ID, (int)id);
}
return true;
}