in src/dxapi/native/tickdb/http/tickcursor_http.cpp [555:667]
bool TickCursorImpl::readNextMessageInBlock(MessageHeader &msg)
{
DataReaderBaseImpl &reader = *this->reader_;
// We should never ever enter readNextMessage() if reset mode is activated
assert(isWithinMessageBlock_);
while (1) {
unsigned messageSize = reader.getUInt32();
if ((unsigned)MESSAGE_BLOCK_TERMINATOR_RECORD == messageSize) {
isWithinMessageBlock_ = false;
return false;
}
msg.length = messageSize;
reader.setMessageSize(messageSize);
if (!in_range(messageSize, (unsigned)CURSOR_MESSAGE_HEADER_SIZE_EID32, (unsigned)MAX_MESSAGE_SIZE + 1)) {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): Incorrect message size: %u", ID, messageSize);
}
// read: timestamp instrument_index type_index body
lastTimestamp_ = msg.timestamp = reader.getInt64();
unsigned currentEntityId = readEntityId<false>(reader);
msg.entityId = currentEntityId;
unsigned currentDescriptorId = reader.getByte();
msg.typeId = currentDescriptorId;
// This will convert 255 to -1, all other byte values are left unchanged
intptr_t currentStreamId = (intptr_t)((reader.getByte() + 1U) & 0xFF) - 1;
msg.streamId = (int32_t)currentStreamId;
//DBGLOG("msg.entityId=%u", currentEntityId);
// At this point the whole header is read from the input stream
if (shadowCopyUpdated_) {
srw_write section(thisLock_); // TODO: srw_read?
assert(shadowState_ != mainState_);
mainState_ = shadowState_;
shadowCopyUpdated_ = false; // TODO: combined with flag?
SubscriptionState &s = *mainState_;
if (s.readingRestarted_) {
s.readingRestarted_ = false;
setState(CursorState::STARTED);
}
}
const SubscriptionState &s = *mainState_;
if (!s.tables_.isRegisteredInstrumentId(currentEntityId)) {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): unexpected EntityIndex = %u", ID, currentEntityId);
}
if (!s.tables_.isRegisteredMsgDescId(currentDescriptorId)) {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): unexpected MessageDescriptorIndex = %u", ID, currentDescriptorId);
}
// Special code to consider currectStreamId = -1
if (currentStreamId >= (intptr_t)s.tables_.streamKeys_.size()) {
THROW_DBGLOG_EX(TickCursorError, LOGHDR ".readNextMessage(): unexpected StreamIndex = %u", ID, (byte)currentStreamId);
}
//unsigned currentUserTypeIndex = s.tables_.messageTypeRemote2Local[currentTypeId];
//msg.typeIdLocal = currentUserTypeIndex;
msg.typeIdLocal = 0;
++stats.nMessagesProcessed;
// TODO: use arithmetic OR for speed
// Everything is skipped during Reset
if (s.isResetInProgress_)
goto skip;
// System messages are always allowed, unless Reset
// The rest is filtered by instrument and type
if (!isSystemMessage_[currentDescriptorId] &&
(s.isMessageDescriptorSkipped_.get(currentDescriptorId) || s.isSymbolSkipped_.get(currentEntityId)))
goto skip;
#if defined(_DEBUG) && VERBOSE_CURSOR >= 1
if (isSystemMessage_[currentDescriptorId]) {
DBGLOG(LOGHDR ": %s system msg received. Stats: %s", ID, getMessageTypeName(currentDescriptorId)->c_str(), getStatsUpdate(true).c_str());
/*if (NULL != strstr(getMessageTypeName(currentDescriptorId)->c_str(), "RealTimeStartMessage")) {
}*/
}
#endif
#if VERBOSE_CURSOR >= 2
dbg_dumpMessage(msg, "msg");
#endif
msg.cursorState = setState(CursorState::READING); // TODO: Better solution later, no time
return true;
skip:
#if VERBOSE_CURSOR >= 2
dbg_dumpMessage(msg, "skip");
#endif
//#if VERBOSE_CURSOR_FILTERS >= 2
// DBGLOG(LOGHDR ": msg skipped by filter", ID);
//#endif
++stats.nMessagesSkipped;
reader_->nextMessage();
}
}