in java/clickhouse-connector/src/main/java/com/epam/deltix/timebase/connector/clickhouse/algos/UnboundTableWriter.java [588:626]
private void encode(RawMessage message, MemoryDataInput dataInput, Codec codec)
throws SQLException
{
dataInput.setBytes(message.data, message.offset, message.length);
UnboundDecoder messageDecoder = codec.getUnboundDecoder();
Map<String, BiConsumer<TimebaseContext, FieldContext>> fieldCodecs = codec.getFieldCodecs();
messageDecoder.beginRead(dataInput);
ClickhouseContext clickhouseContext = codec.getClickhouseContext();
Set<String> availableColumnNames = clickhouseContext.map.keySet();
int parameterIndex = 1;
long millisTime = message.getTimeStampMs(); // TODO: switch to nanos
CharSequence symbol = message.getSymbol();
// TODO: heavily allocates
PreparedStatement statement = codec.getInsertStatement();
if (availableColumnNames.contains(SchemaProcessor.PARTITION_COLUMN_NAME)) {
statement.setDate(parameterIndex++, new Date(millisTime));
}
statement.setTimestamp(parameterIndex++, new Timestamp(millisTime));
statement.setString(parameterIndex++, symbol.toString());
statement.setString(parameterIndex++, message.type.getName());
TimebaseContext timebaseContext = codec.getTimebaseContext();
timebaseContext.messageDecoder = messageDecoder;
clickhouseContext.statement = statement;
//fieldContext.columnDeclaration.getStatementIndex() = parameterIndex;
while (messageDecoder.nextField()) {
String columnName = timebaseContext.getColumnName(message.type.getName(), messageDecoder.getField().getName());
if (availableColumnNames.contains(columnName)) {
fieldCodecs.get(columnName).accept(timebaseContext,
new FieldContext(clickhouseContext, codec.clickhouseContext.getColumn(columnName)));
}
}
statement.addBatch();
}