in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/websockets/WSHandler.java [278:387]
public void afterConnectionEstablished(WebSocketSession session) throws IOException {
session.setTextMessageSizeLimit(MAX_BUFFER_SIZE);
String streamId = (String) session.getAttributes().get("streamId");
QueryStringDecoder decoder = new QueryStringDecoder(session.getUri().toString());
Map<String, List<String>> parameters = decoder.parameters();
MultiValueMap<String, String> params =
UriComponentsBuilder.fromUriString(session.getUri().toString()).build().getQueryParams();
List<String> list;
DXTickStream[] selection = null;
if (streamId == null && channel == null) {
list = params.get("streams");
if (list != null)
selection = match(timebase, list.toArray(new String[list.size()]));
} else if (streamId != null) {
DXTickStream stream = timebase.getStream(streamId);
if (stream != null) {
selection = new DXTickStream[]{ stream };
}
}
if (selection == null) {
session.sendMessage(new TextMessage("Streams is not defined", true));
return;
}
Instant from = null;
list = params.get("from");
if (list != null)
from = Instant.parse(list.get(0));
Instant to = null;
list = params.get("to");
if (list != null)
to = Instant.parse(list.get(0));
HashSet<IdentityKey> instruments = null;
list = params.get("symbols");
if (list != null) {
instruments = new HashSet<>();
ArrayList<String> symbols = new ArrayList<String>();
for (String next : list) {
if (next.contains(","))
symbols.addAll(Arrays.asList(next.split(",")));
else
symbols.add(next);
}
//String[] symbols = list.toArray(new String[list.size()]);
for (DXTickStream stream : selection)
Collections.addAll(instruments, match(stream, symbols));
}
ArrayList<String> types = null;
list = params.get("types");
if (list != null) {
types = new ArrayList<String>();
for (String next : list) {
if (next.contains(","))
types.addAll(Arrays.asList(next.split(",")));
else
types.add(next);
}
}
boolean live = params.get("live") != null;
Interval depth = null;
list = params.get("depth");
if (list != null && list.size() > 0)
depth = Interval.valueOf(list.get(0));
long fromTimestamp = Long.MIN_VALUE;
if (from == null) {
if (depth != null)
fromTimestamp = (to == null ? TimebaseServiceImpl.getEndTime(selection) : to.toEpochMilli()) - depth.toMilliseconds();
} else {
fromTimestamp = from != null ? from.toEpochMilli() : Long.MIN_VALUE;
}
String[] selectedTypes = types != null ? types.toArray(new String[types.size()]) : null;
TickCursor cursor = timebase.getConnection().select(
fromTimestamp,
new SelectionOptions(true, live),
selectedTypes,
collectCharSequence(collect(instruments, live)),
selection);
long toTimestamp = to != null ? to.toEpochMilli() : Long.MAX_VALUE;
LOGGER.log(LogLevel.INFO, " WS CURSOR [" + cursor.hashCode() + "]: SELECT " + (live ? "live " : "") + " * FROM " + Arrays.toString(selection) + " WHERE " +
"TYPES = [" + Arrays.toString(selectedTypes) + "] AND ENTITIES = [" + Arrays.toString(collect(instruments, live)) + "] AND timestamp <= " + toTimestamp );
PumpTask pumpTask = new PumpTask (selection, cursor, toTimestamp, live, session, executor);
onCreate(session, pumpTask);
cursor.setAvailabilityListener(pumpTask.avlnr);
pumpTask.submit();
}