in src/dxapi/native/tickdb/http/tickcursor_subscriptions.cpp [338:427]
void TickCursorImpl::modifyStreams(const TickStream * const streams[], size_t numStreams, SubscrChangeAction action)
{
assert(cursorId_ >= 0);
// modifyStreams retains remove+add logic for 'set' operation
// Will need additional work when this is changed
if (SubscrChangeAction::SET == action) {
modifyStreams(NULL, 0, SubscrChangeAction::REMOVE);
action = SubscrChangeAction::ADD;
if (0 == numStreams) {
return;
}
}
#if VERBOSE_CURSOR_FILTERS >= 1
{
DBGLOG_VERBOSE(LOGHDR ".modifyStreams(%s: %c %s)", ID, TS_CURSOR2STR(subscriptionTimeMs_), action_str[(uintptr)action], subscriptionToString(streams, numStreams).c_str());
}
#endif
bool isRemoving = SubscrChangeAction::REMOVE == action;
srw_write section(thisLock_);
SubscriptionState &s = getWriteShadowState();
DBGLOG(LOGHDR ".modifyStreams(): preparing to change subscription state", ID);
std::vector <const DxApi::TickStream *> newStreamList;
newStreamList.push_back(NULL);
if (NULL == streams) {
s.isStreamSkipped_.setDefaultValue(SubscrChangeAction::ADD != action).clear();
s.isUnregisteredStreamSkipped_.clear();
}
if (NULL != streams) {
forn(i, numStreams) {
const TickStream *stream = streams[i];
const string &key = impl(stream)->key();
auto streamId = s.tables_.findStreamId(key);
if (DxApi::emptyStreamId != streamId) {
s.isStreamSkipped_.set((unsigned)streamId, isRemoving);
}
else if (s.isStreamSkipped_.defaultValue() != (uint8_t)isRemoving) {
// Save this stream key for later (we have yet to encounter a message from this stream)
s.isUnregisteredStreamSkipped_[key] = isRemoving;
}
else {
// Do nothing, this case is covered by the default filter setting
}
// Update main stream list
bool shouldAdd = !isRemoving;
for (auto &j : s.tickdbStreams_) {
if (NULL != j) {
//if (impl(*i).key() == key) {
if (j == stream) {
if (isRemoving) {
j = NULL;
}
shouldAdd = false;
break;
}
}
}
if (shouldAdd) {
s.tickdbStreams_.push_back(stream);
}
}
}
// Re-add all non-null streams
for (auto i : s.tickdbStreams_) {
if (NULL != i) {
newStreamList.push_back(i);
}
}
// Set the new stream list
s.tickdbStreams_ = newStreamList;
#if !(DBG_SKIP_SUBSCRIPTION_CHANGE_REQUESTS > 0)
ModifyStreamsRequest req(db_, cursorId_, subscriptionTimeMs_, action, streams, numStreams);
addSubscriptionCommand(s, req.execute(), "changeStreams");
#endif
// DBGLOG(LOGHDR ".modifyStreams(): changeStreams sent and applied", ID);
}