protected void onWsJson()

in java/connectors/binance-futures/src/main/java/com/epam/deltix/data/connectors/binance/futures/BinanceFuturesFeed.java [68:131]


    protected void onWsJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
        jsonParser.parse(data);

        if (!last) {
            return;
        }

        JsonValue jsonValue = jsonParser.eoj();
        JsonObject object = jsonValue.asObject();
        JsonObject wsData = object.getObject("data");
        if (wsData != null) {
            if ("depthUpdate".equals(wsData.getString("e"))) {
                String symbol = wsData.getString("s").toLowerCase();

                Queue<JsonObject> buffer = updatesBufferMap.get(symbol);
                buffer.add(wsData);

                if (snapshotIdMap.containsKey((symbol))) {
                    if (firstBookUpdate.get(symbol)) {
                        while (buffer.size() > 0) {
                            JsonObject updateItem = buffer.poll();

                            long U = updateItem.getLong("U");
                            long u = updateItem.getLong("u");
                            long lastUpdateId = snapshotIdMap.get(symbol);
                            if (lastUpdateId >= U && lastUpdateId <= u) {
                                processBookUpdate(updateItem);
                                firstBookUpdate.put(symbol, false);
                                lastUpdateIdMap.put(symbol, u);
                                break;
                            } else if (U > lastUpdateId) {
                                firstBookUpdate.put(symbol, false);
                                snapshotIdMap.remove(symbol);
                                buffer.clear();
                                initBookSnapshots(Arrays.asList(symbol));
                            }
                        }
                    } else {
                        while (buffer.size() > 0) {
                            JsonObject updateItem = buffer.poll();
                            if (lastUpdateIdMap.get(symbol) == updateItem.getLong("pu")) {
                                processBookUpdate(updateItem);
                                lastUpdateIdMap.put(symbol, updateItem.getLong("u"));
                            } else {
                                firstBookUpdate.put(symbol, true);
                                snapshotIdMap.remove(symbol);
                                buffer.clear();
                                initBookSnapshots(Arrays.asList(symbol));
                            }
                        }
                    }
                }
            } else if ("trade".equals(wsData.getString("e"))) {
                long timestamp = wsData.getLong("E");

                long price = wsData.getDecimal64Required("p");
                long size = wsData.getDecimal64Required("q");
                boolean buyerMarketMaker = wsData.getBooleanRequired("m");
                AggressorSide side = buyerMarketMaker ? AggressorSide.SELL : AggressorSide.BUY;

                processor().onTrade(wsData.getString("s").toLowerCase(), timestamp, price, size, side);
            }
        }
    }