in java/connectors/okcoin/src/main/java/com/epam/deltix/data/connectors/okcoin/OkcoinSpotFeed.java [60:115]
protected void onJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
String event = object.getString("event");
if ("error".equalsIgnoreCase(event)) {
String errorMessage = object.getString("msg");
logger().warning("Feed error: " + errorMessage);
return;
}
JsonObject arg = object.getObject("arg");
String channel = arg.getString("channel");
String instrument = arg.getStringRequired("instId");
JsonArray arrayData = object.getArray("data");
if (arrayData == null || arrayData.size() < 1) {
return;
}
if ("books".equalsIgnoreCase(channel)) {
String action = object.getStringRequired("action");
JsonObject jsonData = arrayData.getObject(0);
long timestamp = getTimestamp(jsonData.getString("ts"));
if ("snapshot".equalsIgnoreCase(action)) {
QuoteSequenceProcessor quotesListener = processor().onBookSnapshot(instrument, timestamp);
processSnapshotSide(quotesListener, jsonData.getArray("bids"), false);
processSnapshotSide(quotesListener, jsonData.getArray("asks"), true);
quotesListener.onFinish();
} else if ("update".equalsIgnoreCase(action)) {
QuoteSequenceProcessor quotesListener = processor().onBookUpdate(instrument, timestamp);
processChanges(quotesListener, jsonData.getArray("bids"), false);
processChanges(quotesListener, jsonData.getArray("asks"), true);
quotesListener.onFinish();
}
} else if ("trades".equalsIgnoreCase(channel)) {
JsonArray jsonDataArray = object.getArrayRequired("data");
for (int i = 0; i < jsonDataArray.size(); ++i) {
JsonObject trade = jsonDataArray.getObjectRequired(i);
long timestamp = getTimestamp(trade.getString("ts"));
long price = trade.getDecimal64Required("px");
long size = trade.getDecimal64Required("sz");
String tradeDirection = trade.getString("side");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(instrument, timestamp, price, size, side);
}
}
}