in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/websockets/WSHandler.java [506:535]
private void changeSubscription(SetSubscriptionMessage setSubscriptionMessage, TickCursor cursor, PumpTask task) {
synchronized (object) {
task.buffer.clear();
if (setSubscriptionMessage.types != null && !setSubscriptionMessage.types.isEmpty()) {
cursor.setTypes(setSubscriptionMessage.types.toArray(new String[]{}));
} else {
cursor.subscribeToAllTypes();
}
if (setSubscriptionMessage.symbols != null && !setSubscriptionMessage.symbols.isEmpty()) {
HashSet<IdentityKey> instruments = new HashSet<>();
for (DXTickStream stream : task.selection)
Collections.addAll(instruments, match(stream, setSubscriptionMessage.symbols));
cursor.clearAllEntities();
cursor.addEntities(collect(instruments), 0, instruments.size());
} else {
cursor.subscribeToAllEntities();
}
Instant instant = setSubscriptionMessage.from;
if (instant != null) {
long timestamp = instant.toEpochMilli();
if (timestamp != Long.MIN_VALUE) {
cursor.reset(timestamp);
}
}
}
LOGGER.info().append("Changed subscription: ").append(setSubscriptionMessage.toString()).commit();
}