in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/websockets/WSHandler.java [456:504]
private void changeSubscription(SubscribeMessage subscribeMessage, TickCursor cursor, PumpTask task) {
synchronized (object) {
task.buffer.clear();
// working with symbols
if (subscribeMessage.symbols != null) {
if (subscribeMessage.symbols.subscribeToAll) {
cursor.subscribeToAllEntities();
} else if (!subscribeMessage.symbols.isEmpty()) {
if (subscribeMessage.symbols.add != null && !subscribeMessage.symbols.add.isEmpty()) {
HashSet<IdentityKey> instruments = new HashSet<>();
for (DXTickStream stream : task.selection)
Collections.addAll(instruments, match(stream, subscribeMessage.symbols.add));
cursor.addEntities(collect(instruments), 0, instruments.size());
}
if (subscribeMessage.symbols.remove != null && !subscribeMessage.symbols.remove.isEmpty()) {
HashSet<IdentityKey> instruments = new HashSet<>();
for (DXTickStream stream : task.selection)
Collections.addAll(instruments, match(stream, subscribeMessage.symbols.remove));
cursor.removeEntities(collect(instruments), 0, instruments.size());
}
}
}
// working with types
if (subscribeMessage.types != null) {
if (subscribeMessage.types.subscribeToAll) {
cursor.subscribeToAllTypes();
} else if (!subscribeMessage.types.isEmpty()) {
if (subscribeMessage.types.add != null && !subscribeMessage.types.add.isEmpty()) {
cursor.addTypes(subscribeMessage.types.add.toArray(new String[]{}));
}
if (subscribeMessage.types.remove != null && !subscribeMessage.types.remove.isEmpty()) {
cursor.removeTypes(subscribeMessage.types.remove.toArray(new String[]{}));
}
}
}
// working with from timestamp
Instant instant = subscribeMessage.from;
if (instant != null) {
long timestamp = subscribeMessage.from.toEpochMilli();
if (timestamp != Long.MIN_VALUE) {
cursor.reset(timestamp);
}
}
}
LOGGER.info().append("Changed subscription: ").append(subscribeMessage.toString()).commit();
}