in java/connectors/coinbase/src/main/java/com/epam/deltix/data/connectors/coinbase/CoinbaseFeed.java [112:168]
protected void onJson(
final CharSequence data,
final boolean last,
final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
final JsonValue jsonValue = jsonParser.eoj();
final JsonObject object = jsonValue.asObjectRequired();
final String type = object.getString("type");
if (type == null) {
return;
}
switch (type) {
case "ticker": {
final String productId = object.getStringRequired("product_id");
String tradeDirection = object.getString("side");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(
productId,
TimeStampedMessage.TIMESTAMP_UNKNOWN,
object.getDecimal64Required("price"),
object.getDecimal64Required("last_size"),
side);
break;
}
case "snapshot": {
final String instrument = object.getStringRequired("product_id");
final QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, TimeStampedMessage.TIMESTAMP_UNKNOWN);
processSnapshotSide(quotesListener, object.getArray("bids"), false);
processSnapshotSide(quotesListener, object.getArray("asks"), true);
quotesListener.onFinish();
break;
}
case "l2update": {
final String instrument = object.getStringRequired("product_id");
dtParser.set(object.getStringRequired("time"));
final QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument, dtParser.millis());
processChanges(quotesListener, object.getArrayRequired("changes"));
quotesListener.onFinish();
break;
}
default:
break;
}
}