in src/dxapi/native/tickdb/http/tickcursor_http.cpp [925:1002]
INLINE bool TickCursorImpl::next(MessageHeader* const msg, bool nonBlocking)
{
assert(!isInNext_ && "C++ - level TickCursor doesn't support concurrent calls");
isInNext_ = true;
try {
assert(NULL != reader_);
if (state >= CursorState::END) {
if (isInterrupted_) {
THROW_DBGLOG_EX(TickCursorInterruptedException, LOGHDR ".readNext(): interrupted", ID);
msg->cursorState = state;
readRemainderAndClose(true);
return isInNext_ = false;
}
if (state > CursorState::END || (!mainState_->isExecutingSubscriptions() && 0 == reader_->nBytesAvailable() && 0 == activeInputStream_->nBytesAvailable())) {
msg->cursorState = state;
return isInNext_ = false;
}
msg->cursorState = setStateLocked(CursorState::STARTED);
}
if (nonBlocking) {
if (isWithinMessageBlock_ && !readMessageDone_) {
reader_->nextMessage();
readMessageDone_ = true;
}
if (reader_->nBytesAvailableInStream() <= 0 && activeInputStream_->nBytesAvailable() <= 0) {
return isInNext_ = false;
}
readMessageDone_ = false;
}
assert(state < CursorState::END);
bool result = nonBlocking ? readNextMessageNonBlock(*msg) : readNextMessage(*msg);
isInNext_ = false;
return result;
}
catch (const IOStreamDisconnectException&) {
DBGLOG(LOGHDR ".next(): Server disconnected", ID);
msg->cursorState = setState(CursorState::END);
doClose();
isInNext_ = false;
finalStats();
return false;
}
catch (const IOStreamException& e) {
DBGLOG(LOGHDR ".next(): I/O exception was unexpected: %s", ID, e.what());
msg->cursorState = setState(CursorState::END);
doClose();
isInNext_ = false;
throw;
}
catch (const std::exception& e) {
DBGLOG(LOGHDR ".next(): std::exception: %s", ID, e.what());
msg->cursorState = setState(CursorState::END);
doClose();
isInNext_ = false;
throw;
}
catch (...) {
msg->cursorState = setState(CursorState::END);
isInNext_ = false;
throw;
}
// Unreachable
dx_assert(false, "This code should be unreachable!");
msg->cursorState = setState(CursorState::END);
isInNext_ = false;
// No return
}