in java/connectors/kucoin/src/main/java/com/epam/deltix/data/connectors/kucoin/KucoinFeed.java [82:140]
protected void onWsJson(final CharSequence data, final boolean last, final JsonWriter jsonWriter) {
jsonParser.parse(data);
if (!last) {
return;
}
JsonValue jsonValue = jsonParser.eoj();
JsonObject object = jsonValue.asObject();
String subject = object.getString("subject");
JsonObject jasonData = object.getObject("data");
if ("trade.l2update".equals(subject)) {
String symbol = jasonData.getString("symbol").toLowerCase();
Queue<JsonObject> buffer = updatesBufferMap.get(symbol);
buffer.add(jasonData);
if (sequenceIdMap.containsKey((symbol))) {
while (buffer.size() > 0) {
JsonObject updateItem = buffer.poll();
if (sequenceIdMap.get(symbol) + 1 == updateItem.getLong("sequenceStart")) {
processBookUpdate(updateItem);
sequenceIdMap.put(symbol, updateItem.getLong("sequenceEnd"));
} else if (sequenceIdMap.get(symbol) + 1 < updateItem.getLong("sequenceStart")) {
sequenceIdMap.remove(symbol);
buffer.clear();
initBookSnapshots(symbol);
}
}
}
} else if ("trade.l3match".equals(subject)) {
String symbol = jasonData.getString("symbol").toLowerCase();
long timestamp = jasonData.getLong("time");
long price = jasonData.getDecimal64Required("price");
long size = jasonData.getDecimal64Required("size");
String tradeDirection = jasonData.getString("side");
AggressorSide side = "buy".equalsIgnoreCase(tradeDirection) ? AggressorSide.BUY : AggressorSide.SELL;
processor().onTrade(symbol, timestamp, price, size, side);
}
//ping server
long now = System.currentTimeMillis();
long timeDelta = now - lastPingTime;
if (timeDelta > pingTimeout) {
JsonValue pingJson = JsonValue.newObject();
JsonObject pingBody = pingJson.asObject();
pingBody.putString("id", "1");
pingBody.putString("type", "ping");
lastPingTime = now;
pingJson.toJsonAndEoj(jsonWriter);
}
}