public void concurrentTest()

in java/clickhouse-connector/src/integration-test/java/com/epam/deltix/timebase/connector/clickhouse/ConcurrentStreamReplicatorTests.java [423:501]


    public void concurrentTest() throws InterruptedException {
        try {
            int expectedMessageCount = MESSAGE_COUNT;

            List<DXTickStream> tickStreams = new ArrayList<DXTickStream>();

            tickStreams.add(loadData(BestBidOfferTestMessage.class, MESSAGE_COUNT, value -> generateBBO(value)));
            tickStreams.add(loadData(TradeTestMessage.class, MESSAGE_COUNT, value -> generateTrades(value)));
            tickStreams.add(loadData(BestBidOfferTestMessage.class, MESSAGE_COUNT, value -> generateBBO(value)));
            tickStreams.add(loadData(TradeTestMessage.class, MESSAGE_COUNT, value -> generateTrades(value)));

            LOG.info().append("Data streams created.").commit();
            final ReplicationProperties replicationProperties = new ReplicationProperties();
            final List<StreamRequest> streams = tickStreams.stream().map(t -> {
                StreamRequest streamRequest = new StreamRequest();
                streamRequest.setStream(t.getKey());
                streamRequest.setKey(t.getKey());
                return streamRequest;
            }).collect(Collectors.toList());

            replicationProperties.setStreams(streams);
            replicationProperties.setFlushMessageCount(10_000);
            replicationProperties.setFlushTimeoutMs(20_000);

            final ReplicatorService service = new ReplicatorService(clickhouseClient, tickDB, clickhouseProperties, replicationProperties);
            service.startReplicators();

            LOG.info().append("Starting replication.").commit();

            List<TableDeclaration> tableDeclarations = tickStreams.stream()
                    .map(stream -> Util.getTableDeclaration(stream, ColumnNamingScheme.NAME_AND_DATATYPE))
                    .collect(Collectors.toList());

            final List<TableDeclaration> tableDeclarations2 = new ArrayList<>(tableDeclarations);
            Map<String, Boolean> existsTables = new HashMap<>(tableDeclarations.size());
            while (tableDeclarations.size() > 0) {
                for (int i = tableDeclarations.size(); i-- > 0; ) {
                    final TableDeclaration tableDeclaration = tableDeclarations.get(i);
                    final TableIdentity tableIdentity = tableDeclaration.getTableIdentity();
                    Boolean isTableExists = existsTables.get(tableIdentity.getTableName());
                    if (isTableExists == null || !isTableExists) {
                        existsTables.put(tableIdentity.getTableName(), clickhouseClient.existsTable(tableIdentity));
                    } else if (expectedMessageCount == getCount(tableIdentity)){
                        tableDeclarations.remove(i);
                    }
                }
                Thread.sleep(5000);
            }
            service.destroy();
            LOG.info().append("Replication finished.").commit();

            int executed = 0;
            final int size = tableDeclarations2.size();
            final int poolSize = 4;
            ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
            for (int i = 0; i < size; i++) {
                final TableDeclaration tableDeclaration = tableDeclarations2.get(i);
                final DXTickStream tickStream = tickStreams.get(i);
                if (tableDeclaration.getTableIdentity().getTableName().contains("BestBidOfferTestMessage")) {
                    final String sequenceNumber = getClickhouseColumn(tickStream, BestBidOfferTestMessage.class, BestBidOfferTestMessage::getSequenceNumber).getDbColumnName();
                    executor.execute(() -> checkAllValues(tableDeclaration, sequenceNumber, tickStream,
                            pair -> verifyBboValues(pair.getLeft(), pair.getMiddle(), pair.getRight())));
                    executed++;
                } else if (tableDeclaration.getTableIdentity().getTableName().contains("TradeTestMessage")) {
                    final String sequenceNumber = getClickhouseColumn(tickStream, TradeTestMessage.class, TradeTestMessage::getSequenceNumber).getDbColumnName();
                    executor.execute(() -> checkAllValues(tableDeclaration, sequenceNumber, tickStream,
                            pair -> verifyTradeValues(pair.getLeft(), pair.getMiddle(), pair.getRight())));
                    executed++;
                }
            }
            while( executor.getCompletedTaskCount() != executed) {
                Thread.sleep(5000);
            }
            LOG.info().append("Concurrent test finished.").commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
            LOG.error().append("Concurrent test failed.").append(e).commit();
        }
    }