void TickCursorImpl::modifyStreams()

in src/dxapi/native/tickdb/http/tickcursor_subscriptions.cpp [338:427]


void TickCursorImpl::modifyStreams(const TickStream * const streams[], size_t numStreams, SubscrChangeAction action)
{
    assert(cursorId_ >= 0);

    // modifyStreams retains remove+add logic for 'set' operation
    // Will need additional work when this is changed
    if (SubscrChangeAction::SET == action) {
        modifyStreams(NULL, 0, SubscrChangeAction::REMOVE);
        action = SubscrChangeAction::ADD;
        if (0 == numStreams) {
            return;
        }
    }

#if VERBOSE_CURSOR_FILTERS >= 1
    {
        DBGLOG_VERBOSE(LOGHDR ".modifyStreams(%s: %c %s)", ID, TS_CURSOR2STR(subscriptionTimeMs_), action_str[(uintptr)action], subscriptionToString(streams, numStreams).c_str());
    }
#endif

    bool isRemoving = SubscrChangeAction::REMOVE == action;
    srw_write section(thisLock_);
    SubscriptionState &s = getWriteShadowState();

    DBGLOG(LOGHDR ".modifyStreams(): preparing to change subscription state", ID);

    std::vector <const DxApi::TickStream *> newStreamList;
    newStreamList.push_back(NULL);

    if (NULL == streams) {
        s.isStreamSkipped_.setDefaultValue(SubscrChangeAction::ADD != action).clear();
        s.isUnregisteredStreamSkipped_.clear();
    }

    if (NULL != streams) {
        forn(i, numStreams) {
            const TickStream *stream = streams[i];
            const string &key = impl(stream)->key();

            auto streamId = s.tables_.findStreamId(key);
            if (DxApi::emptyStreamId != streamId) {
                s.isStreamSkipped_.set((unsigned)streamId, isRemoving);
            }
            else if (s.isStreamSkipped_.defaultValue() != (uint8_t)isRemoving) {
                // Save this stream key for later (we have yet to encounter a message from this stream)
                s.isUnregisteredStreamSkipped_[key] = isRemoving;
            }
            else {
                // Do nothing, this case is covered by the default filter setting
            }

            // Update main stream list
            bool shouldAdd = !isRemoving;
            for (auto &j : s.tickdbStreams_) {
                if (NULL != j) {
                    //if (impl(*i).key() == key) {
                    if (j == stream) {
                        if (isRemoving) {
                            j = NULL;
                        }

                        shouldAdd = false;
                        break;
                    }
                }
            }

            if (shouldAdd) {
                s.tickdbStreams_.push_back(stream);
            }
        }
    }

    // Re-add all non-null streams
    for (auto i : s.tickdbStreams_) {
        if (NULL != i) {
            newStreamList.push_back(i);
        }
    }

    // Set the new stream list
    s.tickdbStreams_ = newStreamList;

#if !(DBG_SKIP_SUBSCRIPTION_CHANGE_REQUESTS > 0)
    ModifyStreamsRequest req(db_, cursorId_, subscriptionTimeMs_, action, streams, numStreams);
    addSubscriptionCommand(s, req.execute(), "changeStreams");
#endif

    // DBGLOG(LOGHDR ".modifyStreams(): changeStreams sent and applied", ID);
}