in java/connectors/bitmart/src/main/java/com/epam/deltix/data/connectors/bitmart/BitmartSpotFeed.java [73:125]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
String type = object.getString("table");
if (type == null) {
return;
}
String[] typeSplitted = type.split("/");
if (typeSplitted.length < 2) {
return;
}
if (typeSplitted[1].startsWith("depth")) {
JsonArray jsonData = object.getArray("data");
for (int i = 0; i < jsonData.size(); ++i) {
JsonObject snapshot = jsonData.getObject(i);
String instrument = snapshot.getStringRequired("symbol");
long timestamp = snapshot.getLong("ms_t");
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
processSnapshotSide(quotesListener, snapshot.getArray("bids"), false);
processSnapshotSide(quotesListener, snapshot.getArray("asks"), true);
quotesListener.onFinish();
}
} else if (typeSplitted[1].startsWith("trade")) {
JsonArray jsonData = object.getArray("data");
for (int i = 0; i < jsonData.size(); ++i) {
JsonObject trade = jsonData.getObject(i);
String symbol = trade.getString("symbol");
if (skipSnapshotsSet.contains(symbol)) {
String tradeDirection = trade.getString("side");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(
symbol, trade.getLong("s_t"),
trade.getDecimal64Required("price"),
trade.getDecimal64Required("size"),
side
);
} else {
// skip trade snapshot
skipSnapshotsSet.add(symbol);
return;
}
}
}
}