in orderbook-it/src/main/java/com/epam/deltix/orderbook/it/OrderBookIT.java [50:108]
private static double readStream(final DXTickStream stream,
final String symbol,
final long startReadTime,
final long endReadTime,
final Consumer<MarketMessageInfo> consumer) {
LOGGER.info().append("Start reading stream ")
.append(stream.getKey())
.append("; Symbol: ")
.append(symbol)
.append("; Start time: ")
.append(startReadTime == Long.MIN_VALUE ? "min" : Instant.ofEpochMilli(startReadTime))
.append("; End time: ")
.append(endReadTime == Long.MAX_VALUE ? "max" : Instant.ofEpochMilli(endReadTime)).commit();
try (TickCursor cursor =
stream.select(startReadTime, new SelectionOptions(), null, new CharSequence[]{symbol})) {
if (!cursor.next()) {
LOGGER.info().append("Empty stream").commit();
return -1;
}
long msgCount = 0;
long entriesCount = 0;
final long startTime = System.currentTimeMillis();
final long streamTime = cursor.getMessage().getTimeStampMs();
long prevStreamTime = streamTime;
while (cursor.next()) {
final InstrumentMessage message = cursor.getMessage();
if (message.getTimeStampMs() >= endReadTime) {
break;
}
if (message.getTimeStampMs() - prevStreamTime > 30_000) {
LOGGER.info("Empty interval: " + message.getTimeString() + "; " + message.getTimeStampMs());
}
if (message instanceof MarketMessageInfo) {
msgCount++;
if (message instanceof PackageHeaderInfo) {
final PackageHeaderInfo packageHeader = (PackageHeaderInfo) message;
entriesCount += packageHeader.getEntries().size();
}
consumer.accept((MarketMessageInfo) message);
if (msgCount % 1000000 == 0) {
logTime(startTime, prevStreamTime - streamTime, msgCount, entriesCount);
}
}
prevStreamTime = message.getTimeStampMs();
}
logTime(startTime, prevStreamTime - startReadTime, msgCount, entriesCount);
final long timePass = System.currentTimeMillis() - startTime;
final double timeSeconds = (double) timePass / 1000.0;
return (msgCount / timeSeconds);
}
}