in java/connectors/binance-futures/src/main/java/com/epam/deltix/data/connectors/binance/futures/BinanceFuturesFeed.java [68:131]
protected void onWsJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
JsonObject wsData = object.getObject("data");
if (wsData != null) {
if ("depthUpdate".equals(wsData.getString("e"))) {
String symbol = wsData.getString("s").toLowerCase();
Queue<JsonObject> buffer = updatesBufferMap.get(symbol);
buffer.add(wsData);
if (snapshotIdMap.containsKey((symbol))) {
if (firstBookUpdate.get(symbol)) {
while (buffer.size() > 0) {
JsonObject updateItem = buffer.poll();
long U = updateItem.getLong("U");
long u = updateItem.getLong("u");
long lastUpdateId = snapshotIdMap.get(symbol);
if (lastUpdateId >= U && lastUpdateId <= u) {
processBookUpdate(updateItem);
firstBookUpdate.put(symbol, false);
lastUpdateIdMap.put(symbol, u);
break;
} else if (U > lastUpdateId) {
firstBookUpdate.put(symbol, false);
snapshotIdMap.remove(symbol);
buffer.clear();
initBookSnapshots(Arrays.asList(symbol));
}
}
} else {
while (buffer.size() > 0) {
JsonObject updateItem = buffer.poll();
if (lastUpdateIdMap.get(symbol) == updateItem.getLong("pu")) {
processBookUpdate(updateItem);
lastUpdateIdMap.put(symbol, updateItem.getLong("u"));
} else {
firstBookUpdate.put(symbol, true);
snapshotIdMap.remove(symbol);
buffer.clear();
initBookSnapshots(Arrays.asList(symbol));
}
}
}
}
} else if ("trade".equals(wsData.getString("e"))) {
long timestamp = wsData.getLong("E");
long price = wsData.getDecimal64Required("p");
long size = wsData.getDecimal64Required("q");
boolean buyerMarketMaker = wsData.getBooleanRequired("m");
AggressorSide side = buyerMarketMaker ? AggressorSide.SELL : AggressorSide.BUY;
processor().onTrade(wsData.getString("s").toLowerCase(), timestamp, price, size, side);
}
}
}