in java/commons/src/main/java/com/epam/deltix/data/connectors/commons/SingleWsRestFeed.java [136:286]
public void start() {
mgmtService.execute(() -> {
if (state != INITIAL_STATE) {
return;
}
state = STARTED_STATE;
final Runnable idleTimeoutWatchdog =
idleTimeoutMillis > 0 ?
() -> {
if (state != STARTED_STATE) {
return;
}
if (System.nanoTime() - lastReceiveTime >= idleTimeoutMillis * 1_000_000L) {
SingleWsRestFeed.this.onError(new TimeoutException("Idle timeout reached"));
}
} : null;
final WebSocket.Listener wsListener = new WebSocket.Listener() {
private final ZlibAsciiTextDecompressor decompressor = new ZlibAsciiTextDecompressor(skipGzipHeader); // TODO: configurable decoder?
@Override
public void onOpen(final WebSocket webSocket) {
final Logger logger = logger();
if (logger.isDebugEnabled()) {
logger.debug("WebSocket onOpen");
}
lastReceiveTime = System.nanoTime();
jsonSender = new WsJsonFrameSender(webSocket);
try {
subscribe(jsonSender, symbols);
} catch (final Throwable t) {
SingleWsRestFeed.this.onError(t);
}
WebSocket.Listener.super.onOpen(webSocket);
if (idleTimeoutWatchdog != null) {
mgmtService.scheduleWithFixedDelay(
idleTimeoutWatchdog,
idleTimeoutMillis,
idleTimeoutMillis,
TimeUnit.MILLISECONDS);
}
if (periodicalJsonTask != null) {
mgmtService.scheduleWithFixedDelay(
() -> {
if (state != STARTED_STATE) {
return;
}
periodicalJsonTask.execute(jsonSender);
},
periodicalJsonTask.delayMillis(),
periodicalJsonTask.delayMillis(),
TimeUnit.MILLISECONDS);
}
}
@Override
public CompletionStage<?> onText(final WebSocket webSocket, final CharSequence data, final boolean last) {
final Logger logger = logger();
if (logger.isDebugEnabled()) {
logger.debug("WebSocket onText (last=" + last + ") " + data);
}
lastReceiveTime = System.nanoTime();
try {
onWsJson(data, last, jsonSender);
} catch (final Throwable t) {
SingleWsRestFeed.this.onError(t);
}
return WebSocket.Listener.super.onText(webSocket, data, last);
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
lastReceiveTime = System.nanoTime();
try {
final CharSequence json = decompressor.decompress(data);
if (json != null) {
final Logger logger = logger();
if (logger.isDebugEnabled()) {
logger.debug("WebSocket onBinary (last=" + last + ") [DECODED] " + json);
}
onWsJson(json, last, jsonSender);
}
} catch (final Throwable t) {
SingleWsRestFeed.this.onError(t);
}
return WebSocket.Listener.super.onBinary(webSocket, data, last);
}
@Override
public CompletionStage<?> onPing(final WebSocket webSocket, final ByteBuffer message) {
final Logger logger = logger();
if (logger.isDebugEnabled()) {
logger.debug("WebSocket onPing " + message);
}
lastReceiveTime = System.nanoTime();
return WebSocket.Listener.super.onPing(webSocket, message);
}
@Override
public void onError(final WebSocket webSocket, final Throwable error) {
final Logger logger = logger();
if (logger.isDebugEnabled()) {
logger.debug("WebSocket onError " + error.getLocalizedMessage());
}
SingleWsRestFeed.this.onError(error);
}
@Override
public CompletionStage<?> onClose(final WebSocket webSocket, final int statusCode, final String reason) {
final Logger logger = logger();
if (logger.isDebugEnabled()) {
logger.debug("WebSocket onClose [" + statusCode + "] " + reason);
}
waitForWsClose.countDown();
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}
};
try {
httpClient = HttpClient.newBuilder().
executor(wsRestExecutorService).build();
String websocketUrl = wsUrl;
if (isAuthRequired) {
websocketUrl = authenticate(wsUrl);
}
webSocket = httpClient.
newWebSocketBuilder().
connectTimeout(Duration.ofSeconds(10)).
buildAsync(URI.create(websocketUrl),
wsListener).join();
} catch (final Throwable t) {
SingleWsRestFeed.this.onError(t);
}
});
}