SessionHandler::Result SessionHandler::receive_STREAM_PROPERTY()

in src/dxapi/native/tickdb/session_handler.cpp [333:483]


SessionHandler::Result SessionHandler::receive_STREAM_PROPERTY()
{
    string key, param;
    uint64 requestId = -1;

    try {
        requestId = read<uint64>();
        readUTF8(key);
        auto p = readPropertyId(); // may throw?

        //TickStreamPropertyMask mask = 0;
        StreamOptions opt;

        auto streamExt = cache().get(key);
        auto stream = impl(streamExt);
        if (NULL == streamExt) {
            DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(%s): ERROR: stream '%s' not found in cache!", ID, p.toString(), key.c_str());
            stream = NULL; // Just to be safe, already NULL for the curent implementation
        }
        else {
            opt = stream->options();
        }

        puts("WARNING: STREAM_PROPERTY event in unexpected at this time! ");
        DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(%s.%s): ERR: not supported (protocol update is pending)!", ID, key.c_str(), p.toString());

        // 8 properties currently sent by protocol
        switch (p) {
        case TickStreamPropertyId::UNKNOWN:
            goto def;

#define CASE(X) break; case TickStreamPropertyId::X:

            CASE(NAME) {
                readUTF8(param);
                opt.name = param;
            }

            CASE(DESCRIPTION) {
                readUTF8(param);
                opt.description = param;
            }

            CASE(PERIODICITY) {
                readUTF8(param);
                opt.periodicity = param;
            }

            CASE(SCHEMA)  {
                // TODO:
                bool isPolymorphic = 0 != read<byte>();
                // TODO: How length is encoded?
                // NOTE: This does not work, XML is written without length (verified)
                readUTF8(param);
            }

            CASE(ENTITIES) {
                // TODO:
                // ??????
            }

            CASE(TIME_RANGE) {
                // TODO:
                // ??????
            }

            CASE(VERSIONING) {
                // TODO:
                // ??????
            }

            CASE(KEY) {
                readUTF8(param);
                // TODO:
            }

            CASE(BG_PROCESS) {
                // TODO: 
            }

            CASE(OWNER)  {
                readUTF8(param);
                opt.owner = param;
            }

            // These are not currently sent
#if 0
            CASE(HIGH_AVAILABILITY) {
                // ??
            }

            CASE(UNIQUE) {
                // ??
            }

            CASE(BUFFER_OPTIONS) {
                // ??
            }

            CASE(DATA_VERSION) {
                // ??
            }

            CASE(REPLICA_VERSION) {
                // ??
            }

            CASE(WRITER_CREATED) {
                // ???
            }

            CASE(WRITER_CLOSED) {
                // ???
            }

            CASE(SCOPE) {
                // ??
            }

            CASE(DF) {
                // ??
            }

#endif
            break;

        def:
        default:
            ;
        }

        if (++nPropertyRequestsReceived_ != requestId) {
            DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(): ERR: #%llu received, #%llu expected!!!!!",
                ID, (ulonglong)requestId, (ulonglong)nPropertyRequestsReceived_);
        }
        else {
            DBGLOG_VERBOSE(LOGHDR ".receive_STREAM_PROPERTY(): #%llu received", ID, (ulonglong)requestId);
        }
    }
    catch (...) {
        if (++nPropertyRequestsReceived_ != requestId) {
            DBGLOG(LOGHDR ".receive_STREAM_PROPERTY(): ERR: #%llu received, #%llu expected!!!!!",
                ID, (ulonglong)requestId, (ulonglong)nPropertyRequestsReceived_);
        }
        else {
            DBGLOG_VERBOSE(LOGHDR ".receive_STREAM_PROPERTY(): #%llu received", ID, (ulonglong)requestId);
        }
    }

    return OK;
}