in java/clickhouse-client/src/main/java/com/epam/deltix/clickhouse/writer/TableWriter.java [50:142]
public void run() {
while (!TableWriter.this.inClosing.get()) {
try {
autoResetEvent.waitOne(TableWriter.this.flushIntervalMs);
int queueSize = TableWriter.this.messages.size();
// flush if
if (queueSize < TableWriter.this.flushSize && // package size is reached
(queueSize == 0 || System.currentTimeMillis() - lastExecuteTimestamp < TableWriter.this.flushIntervalMs) && // or time interval is reached and queue is not empty
(queueSize == 0 || !TableWriter.this.inClosing.get())) // or we are closing and queue is not empty
continue; // nothing to flush, wait next cycle
TableWriter.LOG.debug()
.append("Start flushing procedure. ")
.append(queueSize)
.append(" messages in queue.")
.commit();
// clickhouseClient.executeInSqlConnection(connection -> {
try (Connection connection = clickhouseClient.getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(insertIntoQuery)) {
int batchMsgCount = 0;
while (true) {
T message = messages.poll();
if (message == null) { // queue empty
if (batchMsgCount > 0) { // write last chunk
TableWriter.LOG.debug()
.append("Flushing package of ")
.append(batchMsgCount)
.append(" messages.")
.commit();
statement.executeBatch();
}
TableWriter.LOG.debug()
.append("Finish flushing procedure.")
.commit();
lastExecuteTimestamp = System.currentTimeMillis();
break;
}
encoder.encode(message, statement);
statement.addBatch();
batchMsgCount++;
if (batchMsgCount == TableWriter.this.flushSize) {
TableWriter.LOG.debug()
.append("Flushing package of ")
.append(batchMsgCount)
.append(" messages.")
.commit();
statement.executeBatch();
// continue to write only if we are closing or have a complete package to write
// otherwise wait for new data to come
if (!TableWriter.this.inClosing.get() &&
TableWriter.this.messages.size() < TableWriter.this.flushSize) {
TableWriter.LOG.debug()
.append("Finish flushing procedure.")
.commit();
lastExecuteTimestamp = System.currentTimeMillis();
break;
}
batchMsgCount = 0;
}
}
}
}
//});
} catch (Exception e) {
TableWriter.LOG
.error()
.append("Exception in flusher thread.")
.append(System.lineSeparator())
.append(e)
.commit();
}
}
TableWriter.LOG
.info()
.append("Flusher thread finished.")
.commit();
}