public void start()

in java/commons/src/main/java/com/epam/deltix/data/connectors/commons/SingleWsFeed.java [117:261]


    public void start() {
        mgmtService.execute(() -> {
            if (state != INITIAL_STATE) {
                return;
            }
            state = STARTED_STATE;

            final Runnable idleTimeoutWatchdog =
                    idleTimeoutMillis > 0 ?
                            () -> {
                                if (state != STARTED_STATE) {
                                    return;
                                }
                                if (System.nanoTime() - lastReceiveTime >= idleTimeoutMillis * 1_000_000L) {
                                    SingleWsFeed.this.onError(new TimeoutException("Idle timeout reached"));
                                }
                            } : null;

            final WebSocket.Listener wsListener = new WebSocket.Listener() {
                private final ZlibAsciiTextDecompressor decompressor = new ZlibAsciiTextDecompressor(skipGzipHeader); // TODO: configurable decoder?
                private WsJsonFrameSender jsonSender;

                @Override
                public void onOpen(final WebSocket webSocket) {
                    final Logger logger = logger();
                    if (logger.isDebugEnabled()) {
                        logger.debug("WebSocket onOpen");
                    }

                    lastReceiveTime = System.nanoTime();
                    jsonSender = new WsJsonFrameSender(webSocket);

                    try {
                        subscribe(jsonSender, symbols);
                    } catch (final Throwable t) {
                        SingleWsFeed.this.onError(t);
                    }
                    WebSocket.Listener.super.onOpen(webSocket);

                    if (idleTimeoutWatchdog != null) {
                        mgmtService.scheduleWithFixedDelay(
                                idleTimeoutWatchdog,
                                idleTimeoutMillis,
                                idleTimeoutMillis,
                                TimeUnit.MILLISECONDS);
                    }

                    if (periodicalJsonTask != null) {
                        mgmtService.scheduleWithFixedDelay(
                            () -> {
                                if (state != STARTED_STATE) {
                                    return;
                                }
                                periodicalJsonTask.execute(jsonSender);
                            },
                            periodicalJsonTask.delayMillis(),
                            periodicalJsonTask.delayMillis(),
                            TimeUnit.MILLISECONDS);
                    }
                }

                @Override
                public CompletionStage<?> onText(final WebSocket webSocket, final CharSequence data, final boolean last) {
                    final Logger logger = logger();
                    if (logger.isDebugEnabled()) {
                        logger.debug("WebSocket onText (last=" + last + ") " + data);
                    }

                    lastReceiveTime = System.nanoTime();

                    try {
                        onJson(data, last, jsonSender);
                    } catch (final Throwable t) {
                        SingleWsFeed.this.onError(t);
                    }
                    return WebSocket.Listener.super.onText(webSocket, data, last);
                }


                @Override
                public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
                    lastReceiveTime = System.nanoTime();

                    try {
                        final CharSequence json = decompressor.decompress(data);
                        if (json != null) {
                            final Logger logger = logger();
                            if (logger.isDebugEnabled()) {
                                logger.debug("WebSocket onBinary (last=" + last + ") [DECODED] " + json);
                            }

                            onJson(json, last, jsonSender);
                        }
                    } catch (final Throwable t) {
                        SingleWsFeed.this.onError(t);
                    }
                    return WebSocket.Listener.super.onBinary(webSocket, data, last);
                }

                @Override
                public CompletionStage<?> onPing(final WebSocket webSocket, final ByteBuffer message) {
                    final Logger logger = logger();
                    if (logger.isDebugEnabled()) {
                        logger.debug("WebSocket onPing " + message);
                    }

                    lastReceiveTime = System.nanoTime();

                    return WebSocket.Listener.super.onPing(webSocket, message);
                }

                @Override
                public void onError(final WebSocket webSocket, final Throwable error) {
                    final Logger logger = logger();
                    if (logger.isDebugEnabled()) {
                        logger.debug("WebSocket onError " + error.getLocalizedMessage());
                    }

                    SingleWsFeed.this.onError(error);
                }

                @Override
                public CompletionStage<?> onClose(final WebSocket webSocket, final int statusCode, final String reason) {
                    final Logger logger = logger();
                    if (logger.isDebugEnabled()) {
                        logger.debug("WebSocket onClose [" + statusCode + "] " + reason);
                    }

                    waitForWsClose.countDown();
                    return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
                }
            };
            try {
                webSocket = HttpClient.newBuilder().
                        executor(wsExecutorService).
                        build().
                        newWebSocketBuilder().
                        connectTimeout(Duration.ofSeconds(10)).
                        buildAsync(URI.create(uri),
                                wsListener).join();
            } catch (final Throwable t) {
                SingleWsFeed.this.onError(t);
            }
        });
    }