in java/connectors/deribit/src/main/java/com/epam/deltix/data/connectors/deribit/DeribitFeed.java [82:148]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
JsonObject params = object.getObject("params");
if (params != null) {
String channel = params.getString("channel");
if (channel != null) {
if (channel.contains("book.")) {
JsonObject jsonData = params.getObject("data");
String instrument = jsonData.getString("instrument_name").toLowerCase();
long timestamp = jsonData.getLong("timestamp");
String type = jsonData.getString("type");
Long changeId = jsonData.getLong("change_id");
if ("snapshot".equals(type)) {
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
processSnapshotSide(quotesListener, jsonData.getArray("bids"), false);
processSnapshotSide(quotesListener, jsonData.getArray("asks"), true);
quotesListener.onFinish();
changeIdMap.put(instrument, changeId);
} else if ("change".equals(type)) {
long previousChangeId = jsonData.getLong("prev_change_id");
if (previousChangeId == changeIdMap.get(instrument)) {
changeIdMap.put(instrument, changeId);
QuoteSequenceProcessor quotesListenerUpdate = processor().onBookUpdate(instrument, timestamp);
processChanges(quotesListenerUpdate, jsonData.getArray("bids"), false);
processChanges(quotesListenerUpdate, jsonData.getArray("asks"), true);
quotesListenerUpdate.onFinish();
} else {
unsubscribe(jsonWriter, instrument);
changeIdMap.remove(instrument);
subscribe(jsonWriter, instrument);
}
}
} else if (channel.contains("trade.")) {
JsonArray dataArray = object.getArrayRequired("data");
for (int i = 0; i < dataArray.size(); ++i) {
JsonObject trade = dataArray.getObjectRequired(i);
long timestamp = trade.getLong("timestamp");
long price = trade.getDecimal64Required("price");
long size = trade.getDecimal64Required("amount");
String instrument = trade.getString("instrument_name");
String tradeDirection = trade.getString("direction");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(instrument, timestamp, price, size, side);
}
}
}
}
}