in java/connectors/binance-spot/src/main/java/com/epam/deltix/data/connectors/binance/spot/BinanceSpotFeed.java [68:130]
protected void onWsJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
if ("depthUpdate".equals(object.getString("e"))) {
String symbol = object.getString("s").toLowerCase();
Queue<JsonObject> buffer = updatesBufferMap.get(symbol);
buffer.add(object);
if (snapshotIdMap.containsKey((symbol))) {
if (firstBookUpdate.get(symbol)) {
while (buffer.size() > 0) {
JsonObject updateItem = buffer.poll();
long U = updateItem.getLong("U");
long u = updateItem.getLong("u");
long lastUpdateId = snapshotIdMap.get(symbol);
if (lastUpdateId >= U && lastUpdateId <= u) {
processBookUpdate(updateItem);
firstBookUpdate.put(symbol, false);
lastUpdateIdMap.put(symbol, u);
break;
} else if (U > lastUpdateId) {
firstBookUpdate.put(symbol, false);
snapshotIdMap.remove(symbol);
buffer.clear();
initBookSnapshots(Arrays.asList(symbol));
}
}
} else {
while (buffer.size() > 0) {
JsonObject updateItem = buffer.poll();
if (lastUpdateIdMap.get(symbol) + 1 == updateItem.getLong("U")) {
processBookUpdate(updateItem);
lastUpdateIdMap.put(symbol, updateItem.getLong("u"));
} else {
firstBookUpdate.put(symbol, true);
snapshotIdMap.remove(symbol);
buffer.clear();
initBookSnapshots(Arrays.asList(symbol));
}
}
}
}
} else if ("trade".equals(object.getString("e"))) {
long timestamp = object.getLong("E");
long price = object.getDecimal64Required("p");
long size = object.getDecimal64Required("q");
boolean buyerMarketMaker = object.getBooleanRequired("m");
AggressorSide side = buyerMarketMaker ? AggressorSide.SELL : AggressorSide.BUY;
processor().onTrade(object.getString("s").toLowerCase(), timestamp, price, size, side);
}
}