in java/connectors/hitbtc/src/main/java/com/epam/deltix/data/connectors/hitbtc/HitbtcFeed.java [73:123]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
String channel = object.getString("ch");
if ("orderbook/full".equalsIgnoreCase(channel)) {
JsonObject snapshot = object.getObject("snapshot");
JsonObject update = object.getObject("update");
if (snapshot != null) {
snapshot.forEachObject((instrument, symbolSnapshot) -> {
long timestamp = symbolSnapshot.getLong("t");
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
processSnapshotSide(quotesListener, symbolSnapshot.getArray("b"), false);
processSnapshotSide(quotesListener, symbolSnapshot.getArray("a"), true);
quotesListener.onFinish();
});
} else if (update != null) {
update.forEachObject((instrument, symbolUpdate) -> {
long timestamp = symbolUpdate.getLong("t");
QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument, timestamp);
processUpdates(quotesListener, symbolUpdate.getArray("b"), false);
processUpdates(quotesListener, symbolUpdate.getArray("a"), true);
quotesListener.onFinish();
});
}
} else if ("trades".equalsIgnoreCase(channel)) {
JsonObject update = object.getObject("update");
if (update != null) {
update.forEachArray((instrument, trades) -> {
for (int i = 0; i < trades.size(); ++i) {
JsonObject trade = trades.getObject(i);
String tradeDirection = trade.getString("s");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(instrument,
trade.getLongRequired("t"),
trade.getDecimal64Required("p"),
trade.getDecimal64Required("q"),
side
);
}
});
}
}
}