in java/connectors/poloniex/src/main/java/com/epam/deltix/data/connectors/poloniex/PoloniexFeed.java [65:135]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
String event = object.getString("event");
String channel = object.getString("channel");
if ("book_lv2".equals(channel) && !"subscribe".equals(event)) {
String action = object.getString("action");
if (action != null) {
JsonObject jasonData = object.getArray("data").
items().collect(Collectors.toList()).get(0).asObject();
if ("snapshot".equals(action)) {
String symbol = jasonData.getString("symbol");
long timestamp = jasonData.getLong("createTime");
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(symbol, timestamp);
processSnapshotSide(quotesListener, jasonData.getArray("asks"), true);
processSnapshotSide(quotesListener, jasonData.getArray("bids"), false);
quotesListener.onFinish();
} else if ("update".equals(action)) {
String symbol = jasonData.getString("symbol");
long timestamp = jasonData.getLong("createTime");
QuoteSequenceProcessor quotesListener = processor().onBookUpdate(symbol, timestamp);
processChanges(quotesListener, jasonData.getArray("bids"), false);
processChanges(quotesListener, jasonData.getArray("asks"), true);
quotesListener.onFinish();
}
//ping server
long now = System.currentTimeMillis();
long timeDelta = now - lastPingTime;
if (timeDelta > pingTimeout) {
lastPingTime = now;
JsonValue pingJson = JsonValue.newObject();
JsonObject pingBody = pingJson.asObject();
pingBody.putString("event", "ping");
pingJson.toJsonAndEoj(jsonWriter);
}
}
} else if ("trades".equals(channel) && !"subscribe".equals(event)) {
JsonArray dataArray = object.getArrayRequired("data");
for (int i = 0; i < dataArray.size(); ++i) {
JsonObject trade = dataArray.getObjectRequired(i);
long timestamp = trade.getLong("ts");
String symbol = trade.getString("symbol");
long price = trade.getDecimal64Required("price");
long size = trade.getDecimal64Required("quantity");
String tradeDirection = trade.getString("takerSide");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(symbol, timestamp, price, size, side);
}
}
}