in java/runner/src/intTest/java/com/epam/deltix/data/connectors/IntegrationTest.java [171:243]
void tryBuildOrderBook(final TestConnectorReports connector) {
final int expectedNumOfMessages = VALIDATION_READ_MESSAGES;
final int timeoutSeconds = READ_TIMEOUT_S;
final DXTickStream stream = db.getStream(connector.stream());
Assertions.assertNotNull(stream, "Connector " + connector + " not started as expected. " +
"No output stream found.");
Map<String, DataValidator> validators = new HashMap<>();
AtomicLong errors = new AtomicLong();
try (TickCursor cursor = stream.select(TimeConstants.TIMESTAMP_UNKNOWN, new SelectionOptions(false, true))) {
int messages = 0;
while (readWithTimeout(timeoutSeconds, cursor, connector)) {
InstrumentMessage message = cursor.getMessage();
if (message.getSymbol() == null) {
continue;
}
// validate
if (message instanceof PackageHeaderInfo) {
PackageHeaderInfo packageHeader = (PackageHeaderInfo) message;
DataValidator validator = validators.computeIfAbsent(
message.getSymbol().toString(),
k -> createValidator(k, (sender, severity, exception, stringMessage) -> {
if (severity == Severity.ERROR) {
errors.addAndGet(1);
LOG.severe(severity + " | " + stringMessage);
} else {
LOG.warning(severity + " | " + stringMessage);
}
if (exception != null) {
LOG.log(Level.SEVERE, "Exception", exception);
}
}, stream
)
);
validator.sendPackage(packageHeader);
if (++messages % 100 == 0) {
LOG.info("Processed " + messages + " package headers");
}
if (messages == expectedNumOfMessages) {
LOG.info("Processed " + messages + " package headers");
break;
}
}
}
} finally {
exportStream(stream);
}
Set<String> supportedModel = new HashSet<>();
try (TickCursor cursor = stream.select(TimeConstants.TIMESTAMP_UNKNOWN, new SelectionOptions(false, false))) {
while (cursor.next()) {
InstrumentMessage message = cursor.getMessage();
if (message instanceof PackageHeaderInfo) {
updateSupportedModel((PackageHeaderInfo) message, supportedModel);
}
}
} finally {
connector.addTestMessage(
ReportGenerator.SUPPORTED_MODEL_REPORT,
supportedModel.stream().sorted().collect(Collectors.joining(","))
);
}
Assertions.assertEquals(0L, errors.get());
}