protected void onJson()

in java/connectors/deribit/src/main/java/com/epam/deltix/data/connectors/deribit/DeribitFeed.java [82:148]


    protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
        jsonParser.parse(data);

        if (!last) {
            return;
        }

        JsonValue jsonValue = jsonParser.eoj();
        JsonObject object = jsonValue.asObject();
        JsonObject params = object.getObject("params");

        if (params != null) {
            String channel = params.getString("channel");

            if (channel != null) {
                if (channel.contains("book.")) {
                    JsonObject jsonData = params.getObject("data");
                    String instrument = jsonData.getString("instrument_name").toLowerCase();
                    long timestamp = jsonData.getLong("timestamp");
                    String type = jsonData.getString("type");
                    Long changeId = jsonData.getLong("change_id");

                    if ("snapshot".equals(type)) {
                        QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);

                        processSnapshotSide(quotesListener, jsonData.getArray("bids"), false);
                        processSnapshotSide(quotesListener, jsonData.getArray("asks"), true);

                        quotesListener.onFinish();
                        changeIdMap.put(instrument, changeId);

                    } else if ("change".equals(type)) {
                        long previousChangeId = jsonData.getLong("prev_change_id");

                        if (previousChangeId == changeIdMap.get(instrument)) {
                            changeIdMap.put(instrument, changeId);

                            QuoteSequenceProcessor quotesListenerUpdate = processor().onBookUpdate(instrument, timestamp);
                            processChanges(quotesListenerUpdate, jsonData.getArray("bids"), false);
                            processChanges(quotesListenerUpdate, jsonData.getArray("asks"), true);

                            quotesListenerUpdate.onFinish();
                        } else {
                            unsubscribe(jsonWriter, instrument);
                            changeIdMap.remove(instrument);

                            subscribe(jsonWriter, instrument);
                        }
                    }
                } else if (channel.contains("trade.")) {
                    JsonArray dataArray = object.getArrayRequired("data");
                    for (int i = 0; i < dataArray.size(); ++i) {
                        JsonObject trade = dataArray.getObjectRequired(i);
                        long timestamp = trade.getLong("timestamp");
                        long price = trade.getDecimal64Required("price");
                        long size = trade.getDecimal64Required("amount");
                        String instrument = trade.getString("instrument_name");
                        String tradeDirection = trade.getString("direction");

                        AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;

                        processor().onTrade(instrument, timestamp, price, size, side);
                    }
                }
            }
        }
    }