protected void onJson()

in java/connectors/bitfinex/src/main/java/com/epam/deltix/data/connectors/bitfinex/BitfinexSpotFeed.java [121:200]


    protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
        jsonParser.parse(data);

        if (!last) {
            return;
        }

        JsonValue jsonValue = jsonParser.eoj();

        JsonObject object = jsonValue.asObject();
        JsonArray array = jsonValue.asArray();
        if (object != null) {
            if ("subscribed".equalsIgnoreCase(object.getString("event"))) {
                String channel = object.getString("channel");
                String symbol = object.getString("symbol");
                long channelId = object.getLong("chanId");
                ChannelType type;
                if ("book".equalsIgnoreCase(channel)) {
                    type = ChannelType.BOOK;
                } else if ("trades".equalsIgnoreCase(channel)) {
                    type = ChannelType.TRADES;
                } else {
                    return;
                }

                if (channels.get(channelId, null) == null) {
                    channels.put(channelId, new Channel(type, symbol));
                }
            }
        } else if (array != null) {
            if (array.size() < 2) {
                return;
            }

            long channelId = array.getLong(0);
            Channel channel = channels.get(channelId, null);
            if (channel != null) {
                if (channel.type() == ChannelType.BOOK) {
                    if (array.getArray(1) == null) {
                        return;
                    }

                    if (!channel.snapshotReceived()) {
                        QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(channel.symbol);
                        processQuotes(quotesListener, array.getArray(1), true);
                        quotesListener.onFinish();
                    } else {
                        QuoteSequenceProcessor quotesListener = processor().onBookUpdate(channel.symbol);
                        processQuotes(quotesListener, array.getArray(1), false);
                        quotesListener.onFinish();
                    }
                } else if (channel.type() == ChannelType.TRADES) {
                    if (channel.snapshotReceived()) {
                        if (array.size() < 3) {
                            return;
                        }

                        if ("tu".equalsIgnoreCase(array.getString(1))) {
                            JsonArray trade = array.getArray(2);
                            if (trade != null) {
                                if (trade.size() < 4) {
                                    throw new RuntimeException("Invalid trade size: " + trade.size());
                                }

                                AggressorSide side =  trade.getDouble(2) > 0 ? AggressorSide.BUY : AggressorSide.SELL;

                                processor().onTrade(
                                    channel.symbol(),
                                    trade.getLong(1),
                                    trade.getDecimal64Required(3),
                                    Decimal64Utils.abs(trade.getDecimal64Required(2)),
                                    side
                                );
                            }
                        }
                    }
                }
            }
        }
    }