public void afterConnectionEstablished()

in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/websockets/WSHandler.java [295:405]


    public void afterConnectionEstablished(WebSocketSession session) throws IOException {

        metrics.endpointCounter(WebSocketConfig.SUBSCRIPTIONS_METRIC, endpoint()).increment();

        session.setTextMessageSizeLimit(MAX_BUFFER_SIZE);

        String streamId = (String) session.getAttributes().get("streamId");

        MultiValueMap<String, String> params =
                UriComponentsBuilder.fromUriString(session.getUri().toString()).build().getQueryParams();

        List<String> list;

        DXTickStream[] selection = null;

        if (streamId == null && channel == null) {
            list = params.get("streams");
            if (list != null)
                selection = match(timebase, list.toArray(new String[list.size()]));
        } else if (streamId != null) {
            DXTickStream stream = timebase.getStream(streamId);
            if (stream != null) {
                selection = new DXTickStream[]{ stream };
            }
        }

        if (selection == null) {
            session.sendMessage(new TextMessage("Streams is not defined", true));
            return;
        }

        Instant from = null;
        list = params.get("from");
        if (list != null)
            from = Instant.parse(list.get(0));

        Instant to = null;
        list = params.get("to");
        if (list != null)
            to = Instant.parse(list.get(0));

        HashSet<String> instruments = null;
        list = params.get("symbols");
        if (list != null) {
            instruments = new HashSet<>();

            ArrayList<String> symbols = new ArrayList<String>();
            for (String next : list) {
                next = java.net.URLDecoder.decode(next, StandardCharsets.UTF_8);
                if (next.contains(","))
                    symbols.addAll(Arrays.asList(next.split(",")));
                else
                    symbols.add(next);
            }

            //String[] symbols = list.toArray(new String[list.size()]);

            //for (DXTickStream stream : selection)
            instruments.addAll(symbols);
        }

        ArrayList<String> types = null;
        list = params.get("types");

        if (list != null) {
             types = new ArrayList<String>();
             for (String next : list) {
                if (next.contains(","))
                    types.addAll(Arrays.asList(next.split(",")));
                else
                    types.add(next);
            }
        }

        boolean live = params.get("live") != null;

        Interval depth = null;
        list = params.get("depth");

        if (list != null && list.size() > 0)
            depth = Interval.valueOf(list.get(0));

        long fromTimestamp = Long.MIN_VALUE;

        if (from == null) {
            if (depth != null)
                fromTimestamp = (to == null ? getEndTime(selection) : to.toEpochMilli()) - depth.toMilliseconds();
        } else {
            fromTimestamp = from != null ? from.toEpochMilli() : Long.MIN_VALUE;
        }

        String[] selectedTypes = types != null ? types.toArray(new String[types.size()]) : null;

        TickCursor cursor = timebase.getConnection().select(
                fromTimestamp,
                new SelectionOptions(true, live),
                selectedTypes,
                collect(instruments, live),
                selection);

        long toTimestamp = to != null ? to.toEpochMilli() : Long.MAX_VALUE;

        LOGGER.log(LogLevel.INFO, " WS CURSOR [" + cursor.hashCode() + "]: SELECT " + (live ? "live " : "") + " * FROM " + Arrays.toString(selection) + " WHERE " +
                "TYPES = [" + Arrays.toString(selectedTypes) + "] AND ENTITIES = [" + Arrays.toString(collect(instruments, live)) +
                "] AND timestamp BETWEEN(" + fromTimestamp + " AND " + toTimestamp + ")");

        PumpTask pumpTask = new PumpTask (selection, cursor, toTimestamp, live, session, metrics, executor);
        onCreate(session, pumpTask);
        cursor.setAvailabilityListener(pumpTask.avlnr);
        pumpTask.submit();
    }