void tryBuildOrderBook()

in java/runner/src/intTest/java/com/epam/deltix/data/connectors/IntegrationTest.java [171:243]


    void tryBuildOrderBook(final TestConnectorReports connector) {
        final int expectedNumOfMessages = VALIDATION_READ_MESSAGES;
        final int timeoutSeconds = READ_TIMEOUT_S;

        final DXTickStream stream = db.getStream(connector.stream());

        Assertions.assertNotNull(stream, "Connector " + connector + " not started as expected. " +
            "No output stream found.");

        Map<String, DataValidator> validators = new HashMap<>();
        AtomicLong errors = new AtomicLong();

        try (TickCursor cursor = stream.select(TimeConstants.TIMESTAMP_UNKNOWN, new SelectionOptions(false, true))) {
            int messages = 0;
            while (readWithTimeout(timeoutSeconds, cursor, connector)) {
                InstrumentMessage message = cursor.getMessage();
                if (message.getSymbol() == null) {
                    continue;
                }

                // validate
                if (message instanceof PackageHeaderInfo) {
                    PackageHeaderInfo packageHeader = (PackageHeaderInfo) message;
                    DataValidator validator = validators.computeIfAbsent(
                        message.getSymbol().toString(),
                        k -> createValidator(k, (sender, severity, exception, stringMessage) -> {
                                if (severity == Severity.ERROR) {
                                    errors.addAndGet(1);
                                    LOG.severe(severity + " | " + stringMessage);
                                } else {
                                    LOG.warning(severity + " | " + stringMessage);
                                }

                                if (exception != null) {
                                    LOG.log(Level.SEVERE, "Exception", exception);
                                }
                            }, stream
                        )
                    );

                    validator.sendPackage(packageHeader);

                    if (++messages % 100 == 0) {
                        LOG.info("Processed " + messages + " package headers");
                    }

                    if (messages == expectedNumOfMessages) {
                        LOG.info("Processed " + messages + " package headers");
                        break;
                    }
                }
            }
        } finally {
            exportStream(stream);
        }

        Set<String> supportedModel = new HashSet<>();
        try (TickCursor cursor = stream.select(TimeConstants.TIMESTAMP_UNKNOWN, new SelectionOptions(false, false))) {
            while (cursor.next()) {
                InstrumentMessage message = cursor.getMessage();
                if (message instanceof PackageHeaderInfo) {
                    updateSupportedModel((PackageHeaderInfo) message, supportedModel);
                }
            }
        } finally {
            connector.addTestMessage(
                ReportGenerator.SUPPORTED_MODEL_REPORT,
                supportedModel.stream().sorted().collect(Collectors.joining(","))
            );
        }

        Assertions.assertEquals(0L, errors.get());
    }