public void run()

in java/clickhouse-client/src/main/java/com/epam/deltix/clickhouse/writer/TableWriter.java [50:142]


        public void run() {
            while (!TableWriter.this.inClosing.get()) {
                try {
                    autoResetEvent.waitOne(TableWriter.this.flushIntervalMs);

                    int queueSize = TableWriter.this.messages.size();

                    // flush if
                    if (queueSize < TableWriter.this.flushSize && // package size is reached
                            (queueSize == 0 || System.currentTimeMillis() - lastExecuteTimestamp < TableWriter.this.flushIntervalMs) && // or time interval is reached and queue is not empty
                            (queueSize == 0 || !TableWriter.this.inClosing.get())) // or we are closing and queue is not empty
                        continue; // nothing to flush, wait next cycle

                    TableWriter.LOG.debug()
                            .append("Start flushing procedure. ")
                            .append(queueSize)
                            .append(" messages in queue.")
                            .commit();

//                    clickhouseClient.executeInSqlConnection(connection -> {
                    try (Connection connection = clickhouseClient.getConnection()) {
                        try (PreparedStatement statement = connection.prepareStatement(insertIntoQuery)) {

                            int batchMsgCount = 0;

                            while (true) {
                                T message = messages.poll();

                                if (message == null) { // queue empty
                                    if (batchMsgCount > 0) { // write last chunk
                                        TableWriter.LOG.debug()
                                                .append("Flushing package of ")
                                                .append(batchMsgCount)
                                                .append(" messages.")
                                                .commit();

                                        statement.executeBatch();
                                    }

                                    TableWriter.LOG.debug()
                                            .append("Finish flushing procedure.")
                                            .commit();

                                    lastExecuteTimestamp = System.currentTimeMillis();
                                    break;
                                }

                                encoder.encode(message, statement);
                                statement.addBatch();
                                batchMsgCount++;

                                if (batchMsgCount == TableWriter.this.flushSize) {
                                    TableWriter.LOG.debug()
                                            .append("Flushing package of ")
                                            .append(batchMsgCount)
                                            .append(" messages.")
                                            .commit();

                                    statement.executeBatch();

                                    // continue to write only if we are closing or have a complete package to write
                                    // otherwise wait for new data to come
                                    if (!TableWriter.this.inClosing.get() &&
                                            TableWriter.this.messages.size() < TableWriter.this.flushSize) {
                                        TableWriter.LOG.debug()
                                                .append("Finish flushing procedure.")
                                                .commit();

                                        lastExecuteTimestamp = System.currentTimeMillis();
                                        break;
                                    }

                                    batchMsgCount = 0;
                                }
                            }
                        }
                    }
                    //});
                } catch (Exception e) {
                    TableWriter.LOG
                            .error()
                            .append("Exception in flusher thread.")
                            .append(System.lineSeparator())
                            .append(e)
                            .commit();
                }
            }

            TableWriter.LOG
                    .info()
                    .append("Flusher thread finished.")
                    .commit();
        }