private static double readStream()

in orderbook-it/src/main/java/com/epam/deltix/orderbook/it/OrderBookIT.java [50:108]


    private static double readStream(final DXTickStream stream,
                                     final String symbol,
                                     final long startReadTime,
                                     final long endReadTime,
                                     final Consumer<MarketMessageInfo> consumer) {
        LOGGER.info().append("Start reading stream ")
                .append(stream.getKey())
                .append("; Symbol: ")
                .append(symbol)
                .append("; Start time: ")
                .append(startReadTime == Long.MIN_VALUE ? "min" : Instant.ofEpochMilli(startReadTime))
                .append("; End time: ")
                .append(endReadTime == Long.MAX_VALUE ? "max" : Instant.ofEpochMilli(endReadTime)).commit();

        try (TickCursor cursor =
                     stream.select(startReadTime, new SelectionOptions(), null, new CharSequence[]{symbol})) {
            if (!cursor.next()) {
                LOGGER.info().append("Empty stream").commit();
                return -1;
            }

            long msgCount = 0;
            long entriesCount = 0;
            final long startTime = System.currentTimeMillis();
            final long streamTime = cursor.getMessage().getTimeStampMs();
            long prevStreamTime = streamTime;
            while (cursor.next()) {
                final InstrumentMessage message = cursor.getMessage();
                if (message.getTimeStampMs() >= endReadTime) {
                    break;
                }

                if (message.getTimeStampMs() - prevStreamTime > 30_000) {
                    LOGGER.info("Empty interval: " + message.getTimeString() + "; " + message.getTimeStampMs());
                }

                if (message instanceof MarketMessageInfo) {
                    msgCount++;
                    if (message instanceof PackageHeaderInfo) {
                        final PackageHeaderInfo packageHeader = (PackageHeaderInfo) message;
                        entriesCount += packageHeader.getEntries().size();
                    }
                    consumer.accept((MarketMessageInfo) message);

                    if (msgCount % 1000000 == 0) {
                        logTime(startTime, prevStreamTime - streamTime, msgCount, entriesCount);
                    }
                }

                prevStreamTime = message.getTimeStampMs();
            }

            logTime(startTime, prevStreamTime - startReadTime, msgCount, entriesCount);

            final long timePass = System.currentTimeMillis() - startTime;
            final double timeSeconds = (double) timePass / 1000.0;
            return (msgCount / timeSeconds);
        }
    }