protected void onJson()

in java/connectors/poloniex/src/main/java/com/epam/deltix/data/connectors/poloniex/PoloniexFeed.java [65:135]


    protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
        jsonParser.parse(data);

        if (!last) {
            return;
        }

        JsonValue jsonValue = jsonParser.eoj();
        JsonObject object = jsonValue.asObject();
        String event = object.getString("event");
        String channel = object.getString("channel");

        if ("book_lv2".equals(channel) && !"subscribe".equals(event)) {
            String action = object.getString("action");
            if (action != null) {
                JsonObject jasonData = object.getArray("data").
                        items().collect(Collectors.toList()).get(0).asObject();

                if ("snapshot".equals(action)) {
                    String symbol = jasonData.getString("symbol");
                    long timestamp = jasonData.getLong("createTime");

                    QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(symbol, timestamp);

                    processSnapshotSide(quotesListener, jasonData.getArray("asks"), true);
                    processSnapshotSide(quotesListener, jasonData.getArray("bids"), false);

                    quotesListener.onFinish();
                } else if ("update".equals(action)) {
                    String symbol = jasonData.getString("symbol");
                    long timestamp = jasonData.getLong("createTime");

                    QuoteSequenceProcessor quotesListener = processor().onBookUpdate(symbol, timestamp);

                    processChanges(quotesListener, jasonData.getArray("bids"), false);
                    processChanges(quotesListener, jasonData.getArray("asks"), true);

                    quotesListener.onFinish();
                }

                //ping server
                long now = System.currentTimeMillis();
                long timeDelta = now - lastPingTime;

                if (timeDelta > pingTimeout) {
                    lastPingTime = now;
                    JsonValue pingJson = JsonValue.newObject();
                    JsonObject pingBody = pingJson.asObject();

                    pingBody.putString("event", "ping");

                    pingJson.toJsonAndEoj(jsonWriter);
                }
            }
        } else if ("trades".equals(channel) && !"subscribe".equals(event)) {
            JsonArray dataArray = object.getArrayRequired("data");
            for (int i = 0; i < dataArray.size(); ++i) {
                JsonObject trade = dataArray.getObjectRequired(i);

                long timestamp = trade.getLong("ts");
                String symbol = trade.getString("symbol");
                long price = trade.getDecimal64Required("price");
                long size = trade.getDecimal64Required("quantity");
                String tradeDirection = trade.getString("takerSide");

                AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;

                processor().onTrade(symbol, timestamp, price, size, side);
            }
        }
    }