in java/clickhouse-connector/src/integration-test/java/com/epam/deltix/timebase/connector/clickhouse/ConcurrentStreamReplicatorTests.java [423:501]
public void concurrentTest() throws InterruptedException {
try {
int expectedMessageCount = MESSAGE_COUNT;
List<DXTickStream> tickStreams = new ArrayList<DXTickStream>();
tickStreams.add(loadData(BestBidOfferTestMessage.class, MESSAGE_COUNT, value -> generateBBO(value)));
tickStreams.add(loadData(TradeTestMessage.class, MESSAGE_COUNT, value -> generateTrades(value)));
tickStreams.add(loadData(BestBidOfferTestMessage.class, MESSAGE_COUNT, value -> generateBBO(value)));
tickStreams.add(loadData(TradeTestMessage.class, MESSAGE_COUNT, value -> generateTrades(value)));
LOG.info().append("Data streams created.").commit();
final ReplicationProperties replicationProperties = new ReplicationProperties();
final List<StreamRequest> streams = tickStreams.stream().map(t -> {
StreamRequest streamRequest = new StreamRequest();
streamRequest.setStream(t.getKey());
streamRequest.setKey(t.getKey());
return streamRequest;
}).collect(Collectors.toList());
replicationProperties.setStreams(streams);
replicationProperties.setFlushMessageCount(10_000);
replicationProperties.setFlushTimeoutMs(20_000);
final ReplicatorService service = new ReplicatorService(clickhouseClient, tickDB, clickhouseProperties, replicationProperties);
service.startReplicators();
LOG.info().append("Starting replication.").commit();
List<TableDeclaration> tableDeclarations = tickStreams.stream()
.map(stream -> Util.getTableDeclaration(stream, ColumnNamingScheme.NAME_AND_DATATYPE))
.collect(Collectors.toList());
final List<TableDeclaration> tableDeclarations2 = new ArrayList<>(tableDeclarations);
Map<String, Boolean> existsTables = new HashMap<>(tableDeclarations.size());
while (tableDeclarations.size() > 0) {
for (int i = tableDeclarations.size(); i-- > 0; ) {
final TableDeclaration tableDeclaration = tableDeclarations.get(i);
final TableIdentity tableIdentity = tableDeclaration.getTableIdentity();
Boolean isTableExists = existsTables.get(tableIdentity.getTableName());
if (isTableExists == null || !isTableExists) {
existsTables.put(tableIdentity.getTableName(), clickhouseClient.existsTable(tableIdentity));
} else if (expectedMessageCount == getCount(tableIdentity)){
tableDeclarations.remove(i);
}
}
Thread.sleep(5000);
}
service.destroy();
LOG.info().append("Replication finished.").commit();
int executed = 0;
final int size = tableDeclarations2.size();
final int poolSize = 4;
ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
for (int i = 0; i < size; i++) {
final TableDeclaration tableDeclaration = tableDeclarations2.get(i);
final DXTickStream tickStream = tickStreams.get(i);
if (tableDeclaration.getTableIdentity().getTableName().contains("BestBidOfferTestMessage")) {
final String sequenceNumber = getClickhouseColumn(tickStream, BestBidOfferTestMessage.class, BestBidOfferTestMessage::getSequenceNumber).getDbColumnName();
executor.execute(() -> checkAllValues(tableDeclaration, sequenceNumber, tickStream,
pair -> verifyBboValues(pair.getLeft(), pair.getMiddle(), pair.getRight())));
executed++;
} else if (tableDeclaration.getTableIdentity().getTableName().contains("TradeTestMessage")) {
final String sequenceNumber = getClickhouseColumn(tickStream, TradeTestMessage.class, TradeTestMessage::getSequenceNumber).getDbColumnName();
executor.execute(() -> checkAllValues(tableDeclaration, sequenceNumber, tickStream,
pair -> verifyTradeValues(pair.getLeft(), pair.getMiddle(), pair.getRight())));
executed++;
}
}
while( executor.getCompletedTaskCount() != executed) {
Thread.sleep(5000);
}
LOG.info().append("Concurrent test finished.").commit();
} catch (InterruptedException e) {
e.printStackTrace();
LOG.error().append("Concurrent test failed.").append(e).commit();
}
}