in src/dxapi/native/tickdb/session_handler.cpp [231:312]
SessionHandler::Result SessionHandler::receive_STREAMS_DEFINITION()
{
uint64 requestId = -1;
try {
LoadStreamsRequest::Response newStreams;
requestId = read<uint64>();
size_t sz = read<int32_t>();
XmlInput xml;
LoadStreamsRequest req(this->db_);
if (0 == sz)
goto nothing;
if (sz > INT32_MAX) {
THROW_DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: XML size is too big : %lld", ID, (longlong)sz);
}
xml.resize(sz);
ioStream_->read(&xml[0], sz, sz);
#ifdef _DEBUG
DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): finished reading stream metadata: %lld, %08X", ID, (longlong)sz, (unsigned)crc32(xml));
#else
DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): finished reading stream metadata: %lldb", ID, (longlong)sz);
#endif
//printf("%d\n%s\n", (unsigned)sz, xml.c_str());
if (!req.getStreams(newStreams, xml)) {
THROW_DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: unable to parse XML", ID);
}
else {
DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): parsed stream metadata", ID);
}
for (auto &i : newStreams) {
auto s = cache().get(i.key);
if (NULL == s) {
db_.addStream(db_.allocateStream(i.key, i.options));
}
else {
impl(s)->updateOptions(i.options);
}
}
// Mark those streams as recently received
for (auto &i : newStreams) {
knownStreamDefs.add(i.key);
}
nothing:
if (++nStreamDefRequestsReceived_ != requestId) {
DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: #%llu received, #%llu expected!!!!!",
ID, (ulonglong)requestId, (ulonglong)nStreamDefRequestsReceived_);
}
else {
if (0 == sz) {
DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): _empty_ #%llu received", ID, (ulonglong)nStreamDefRequestsReceived_);
}
else {
DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): #%llu received", ID, (ulonglong)nStreamDefRequestsReceived_);
}
}
return OK;
}
catch (...) {
if (++nStreamDefRequestsReceived_ != requestId) {
DBGLOG(LOGHDR ".receive_STREAMS_DEFINITION(): ERR: #%llu received, #%llu expected!!!!!",
ID, (ulonglong)requestId, (ulonglong)nStreamDefRequestsReceived_);
}
else {
DBGLOG_VERBOSE(LOGHDR ".receive_STREAMS_DEFINITION(): #%llu received", ID, (ulonglong)nStreamDefRequestsReceived_);
}
throw;
}
}