in src/dxapi/native/tickdb/session_handler.cpp [333:483]
SessionHandler::Result SessionHandler::receive_STREAM_PROPERTY()
{
string key, param;
uint64 requestId = -1;
try {
requestId = read<uint64>();
readUTF8(key);
auto p = readPropertyId(); // may throw?
//TickStreamPropertyMask mask = 0;
StreamOptions opt;
auto streamExt = cache().get(key);
auto stream = impl(streamExt);
if (NULL == streamExt) {
DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(%s): ERROR: stream '%s' not found in cache!", ID, p.toString(), key.c_str());
stream = NULL; // Just to be safe, already NULL for the curent implementation
}
else {
opt = stream->options();
}
puts("WARNING: STREAM_PROPERTY event in unexpected at this time! ");
DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(%s.%s): ERR: not supported (protocol update is pending)!", ID, key.c_str(), p.toString());
// 8 properties currently sent by protocol
switch (p) {
case TickStreamPropertyId::UNKNOWN:
goto def;
#define CASE(X) break; case TickStreamPropertyId::X:
CASE(NAME) {
readUTF8(param);
opt.name = param;
}
CASE(DESCRIPTION) {
readUTF8(param);
opt.description = param;
}
CASE(PERIODICITY) {
readUTF8(param);
opt.periodicity = param;
}
CASE(SCHEMA) {
// TODO:
bool isPolymorphic = 0 != read<byte>();
// TODO: How length is encoded?
// NOTE: This does not work, XML is written without length (verified)
readUTF8(param);
}
CASE(ENTITIES) {
// TODO:
// ??????
}
CASE(TIME_RANGE) {
// TODO:
// ??????
}
CASE(VERSIONING) {
// TODO:
// ??????
}
CASE(KEY) {
readUTF8(param);
// TODO:
}
CASE(BG_PROCESS) {
// TODO:
}
CASE(OWNER) {
readUTF8(param);
opt.owner = param;
}
// These are not currently sent
#if 0
CASE(HIGH_AVAILABILITY) {
// ??
}
CASE(UNIQUE) {
// ??
}
CASE(BUFFER_OPTIONS) {
// ??
}
CASE(DATA_VERSION) {
// ??
}
CASE(REPLICA_VERSION) {
// ??
}
CASE(WRITER_CREATED) {
// ???
}
CASE(WRITER_CLOSED) {
// ???
}
CASE(SCOPE) {
// ??
}
CASE(DF) {
// ??
}
#endif
break;
def:
default:
;
}
if (++nPropertyRequestsReceived_ != requestId) {
DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(): ERR: #%llu received, #%llu expected!!!!!",
ID, (ulonglong)requestId, (ulonglong)nPropertyRequestsReceived_);
}
else {
DBGLOG_VERBOSE(LOGHDR ".receive_STREAM_PROPERTY(): #%llu received", ID, (ulonglong)requestId);
}
}
catch (...) {
if (++nPropertyRequestsReceived_ != requestId) {
DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(): ERR: #%llu received, #%llu expected!!!!!",
ID, (ulonglong)requestId, (ulonglong)nPropertyRequestsReceived_);
}
else {
DBGLOG_VERBOSE(LOGHDR ".receive_STREAM_PROPERTY(): #%llu received", ID, (ulonglong)requestId);
}
}
return OK;
}