in java/connectors/ascendex/src/main/java/com/epam/deltix/data/connectors/ascendex/AscendexFeed.java [70:118]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
JsonObject jsonData = object.getObject("data");
String channel = object.getString("m");
String symbol = object.getString("symbol");
if("ping".equals(channel)) {
JsonValue pongJson = JsonValue.newObject();
JsonObject pongBody = pongJson.asObject();
pongBody.putString("op", "pong");
pongJson.toJsonAndEoj(jsonWriter);
} else if("depth-snapshot".equals(channel)) {
long timeStamp = jsonData.getLong("ts");
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(symbol, timeStamp);
processSnapshotSide(quotesListener, jsonData.getArray("bids"), false);
processSnapshotSide(quotesListener, jsonData.getArray("asks"), true);
quotesListener.onFinish();
snapshotInitialization.addCharSequence(symbol);
} else if("depth".equals(channel) && snapshotInitialization.containsCharSequence(symbol)) {
long timeStamp = jsonData.getLong("ts");
QuoteSequenceProcessor quotesListenerUpdate = processor().onBookUpdate(symbol, timeStamp);
processChanges(quotesListenerUpdate, jsonData.getArray("bids"), false);
processChanges(quotesListenerUpdate, jsonData.getArray("asks"), true);
quotesListenerUpdate.onFinish();
} else if("trades".equals(channel)) {
JsonArray dataArray = object.getArrayRequired("data");
for (int i = 0; i < dataArray.size(); ++i) {
JsonObject trade = dataArray.getObjectRequired(i);
long timestamp = getTimestamp(trade.getString("ts"));
long price = trade.getDecimal64Required("p");
long size = trade.getDecimal64Required("q");
processor().onTrade(symbol, timestamp, price, size);
}
}
}