in java/connectors/huobi-futures/src/main/java/com/epam/deltix/data/connectors/huobi/HuobiFuturesFeed.java [81:152]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
if (object == null) {
return;
}
long ping = object.getLong("ping");
if (ping != 0L) {
jsonWriter.startObject();
jsonWriter.objectMember("pong");
jsonWriter.numberValue(ping);
jsonWriter.endObject();
jsonWriter.eoj();
return;
}
String topic = object.getString("ch");
if (topic == null) {
return;
}
String[] topicElements = topic.split("\\.");
if (topicElements.length != 4 && topicElements.length != 5) {
return;
}
long timestamp = object.getLong("ts");
String instrument = topicElements[1];
if ("depth".equalsIgnoreCase(topicElements[2])) {
JsonObject tick = object.getObject("tick");
if (tick == null) {
return;
}
String event = tick.getStringRequired("event");
if ("snapshot".equalsIgnoreCase(event)) {
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
processSnapshotSide(quotesListener, tick.getArray("bids"), false);
processSnapshotSide(quotesListener, tick.getArray("asks"), true);
quotesListener.onFinish();
} else if ("update".equalsIgnoreCase(event)) {
QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument, timestamp);
processChanges(quotesListener, tick.getArray("bids"), false);
processChanges(quotesListener, tick.getArray("asks"), true);
quotesListener.onFinish();
}
} else if ("trade".equalsIgnoreCase(topicElements[2]) && "detail".equalsIgnoreCase(topicElements[3])) {
JsonObject tick = object.getObject("tick");
if (tick != null) {
JsonArray dataJson = tick.getArray("data");
if (dataJson != null) {
for (int i = 0; i < dataJson.size(); ++i) {
JsonObject trade = dataJson.getObject(i);
long price = trade.getDecimal64Required("price");
long size = trade.getDecimal64Required("amount");
String tradeDirection = trade.getString("direction");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(instrument, timestamp, price, size, side);
}
}
}
}
}