protected void onJson()

in java/connectors/kraken-spot/src/main/java/com/epam/deltix/data/connectors/kraken/KrakenSpotFeed.java [84:157]


    protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
        jsonParser.parse(data);

        if (!last) {
            return;
        }

        JsonValue jsonValue = jsonParser.eoj();

        JsonArray array = jsonValue.asArray();
        if (array == null) {
            return;
        }

        String type = array.getString(array.size() - 2);
        if (type == null) {
            return;
        }

        String instrument = array.getString(array.size() - 1);
        if (instrument == null) {
            return;
        }

        if (type.startsWith("book-")) {
            JsonObject values = array.getObject(1);
            JsonArray bs = values.getArray("bs");
            JsonArray as = values.getArray("as");
            if (bs != null || as != null) {
                QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument,
                    computeTimestamp(bs, computeTimestamp(as, TimeConstants.TIMESTAMP_UNKNOWN))
                );
                processSnapshotSide(quotesListener, bs, false);
                processSnapshotSide(quotesListener, as, true);
                quotesListener.onFinish();
            } else {
                JsonArray b = values.getArray("b");
                JsonArray a = values.getArray("a");
                JsonObject values2 = array.getObject(2);
                if (values2 != null) {
                    if (b == null) {
                        b = values2.getArray("b");
                    }
                    if (a == null) {
                        a = values2.getArray("a");
                    }
                }

                if (b != null || a != null) {
                    QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument,
                        computeTimestamp(a, computeTimestamp(b, TimeConstants.TIMESTAMP_UNKNOWN))
                    );
                    processChanges(quotesListener, b, false);
                    processChanges(quotesListener, a, true);
                    quotesListener.onFinish();
                }
            }
        } else if (type.startsWith("trade")) {
            JsonArray trades = array.getArray(1);
            if (trades != null) {
                for (int i = 0; i < trades.size(); ++i) {
                    JsonArray trade = trades.getArray(i);
                    long price = trade.getDecimal64Required(0);
                    long size = trade.getDecimal64Required(1);
                    long timestamp = Util.parseTime(trade.getString(2));
                    String tradeDirection = trade.getString(3);

                    AggressorSide side = "b".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;

                    processor().onTrade(instrument, timestamp, price, size, side);
                }
            }
        }
    }