in java/connectors/cryptofacilities/src/main/java/com/epam/deltix/data/connectors/cryptofacilities/CryptofacilitiesFeed.java [77:123]
protected void onJson(CharSequence data, boolean last, JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObjectRequired();
String feed = object.getString("feed");
String instrument = object.getString("product_id");
if (instrument == null) {
return;
}
long timestamp = object.getLong("timestamp");
if ("book_snapshot".equalsIgnoreCase(feed)) {
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
processSnapshotSide(quotesListener, object.getArray("bids"), false);
processSnapshotSide(quotesListener, object.getArray("asks"), true);
quotesListener.onFinish();
} else if ("book".equalsIgnoreCase(feed)) {
QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument, timestamp);
long size = object.getDecimal64Required("qty");
if (Decimal64Utils.isZero(size)) {
size = TypeConstants.DECIMAL_NULL; // means delete the price
}
quotesListener.onQuote(
object.getDecimal64Required("price"),
size,
"sell".equals(object.getStringRequired("side"))
);
quotesListener.onFinish();
} else if ("trade".equalsIgnoreCase(feed)) {
String tradeDirection = object.getString("side");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(
instrument,
timestamp,
object.getDecimal64Required("price"),
object.getDecimal64Required("qty"),
side
);
}
}