protected void onJson()

in java/connectors/okcoin/src/main/java/com/epam/deltix/data/connectors/okcoin/OkcoinSpotFeed.java [60:115]


    protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
        jsonParser.parse(data);

        if (!last) {
            return;
        }

        JsonValue jsonValue = jsonParser.eoj();
        JsonObject object = jsonValue.asObject();

        String event = object.getString("event");
        if ("error".equalsIgnoreCase(event)) {
            String errorMessage = object.getString("msg");
            logger().warning("Feed error: " + errorMessage);
            return;
        }

        JsonObject arg = object.getObject("arg");
        String channel = arg.getString("channel");
        String instrument = arg.getStringRequired("instId");

        JsonArray arrayData = object.getArray("data");
        if (arrayData == null || arrayData.size() < 1) {
            return;
        }

        if ("books".equalsIgnoreCase(channel)) {
            String action = object.getStringRequired("action");
            JsonObject jsonData = arrayData.getObject(0);
            long timestamp = getTimestamp(jsonData.getString("ts"));
            if ("snapshot".equalsIgnoreCase(action)) {
                QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
                processSnapshotSide(quotesListener, jsonData.getArray("bids"), false);
                processSnapshotSide(quotesListener, jsonData.getArray("asks"), true);
                quotesListener.onFinish();
            } else if ("update".equalsIgnoreCase(action)) {
                QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument, timestamp);
                processChanges(quotesListener, jsonData.getArray("bids"), false);
                processChanges(quotesListener, jsonData.getArray("asks"), true);
                quotesListener.onFinish();
            }
        } else if ("trades".equalsIgnoreCase(channel)) {
            JsonArray jsonDataArray = object.getArrayRequired("data");
            for (int i = 0; i < jsonDataArray.size(); ++i) {
                JsonObject trade = jsonDataArray.getObjectRequired(i);
                long timestamp = getTimestamp(trade.getString("ts"));
                long price = trade.getDecimal64Required("px");
                long size = trade.getDecimal64Required("sz");
                String tradeDirection = trade.getString("side");

                AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;

                processor().onTrade(instrument, timestamp, price, size, side);
            }
        }
    }