in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/service/timescale/TimescaleDataService.java [63:102]
public void insertBatch(String query, List<RawMessage> messages, TimescaleSchema schema) {
List<TimescaleColumn> columns = schema.getColumns()
.stream()
.filter(column -> !column.getName().equals("Id"))
.collect(Collectors.toList());
jdbcTemplate.batchUpdate(query, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
RawMessage message = messages.get(i);
Map<String, Object> expandedValues = rawMessageService.expandValues(message);
Integer position = 1;
for (TimescaleColumn column : columns) {
String name = column.getName();
TimescaleColumn.TimescaleDataType dataType = column.getDataType();
if (name.equals("EventTime")) {
ps.setTimestamp(position, Timestamp.from(Instant.ofEpochMilli(message.getTimeStampMs())));
} else if (name.equals("Symbol")) {
ps.setString(position,message.getSymbol().toString());
}/* else if (name.equals("InstrumentType")) {
ps.setString(position, message.getInstrumentType().toString());
}*/ else {
Object value = expandedValues.get(name);
//System.out.println("Start preparing column: " + name);
prepareValue(ps, dataType, value, position);
}
position++;
}
}
@Override
public int getBatchSize() {
return messages.size();
}
});
}