in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [482:525]
private DataType getDataType(String fieldName, Schema fieldSchema) {
boolean nullable = fieldSchema.isOptional();
switch (fieldSchema.type()) {
case INT8:
return new IntegerDataType(ENCODING_INT8, nullable);
case INT16:
return new IntegerDataType(ENCODING_INT16, nullable);
case INT32:
LogicalType logicalIntType = LogicalType.getTypeByName(fieldSchema.name());
if (logicalIntType == TIME_MS) {
return new TimeOfDayDataType(nullable);
}
return new IntegerDataType(ENCODING_INT32, nullable);
case INT64:
LogicalType logicalLongType = LogicalType.getTypeByName(fieldSchema.name());
if (logicalLongType == TIMESTAMP_MS || (timestampFields != null && timestampFields.contains(fieldName))) {
return new DateTimeDataType(nullable);
}
return new IntegerDataType(ENCODING_INT64, nullable);
case STRING:
return new VarcharDataType(ENCODING_INLINE_VARSIZE, nullable, false);
case FLOAT32:
return new FloatDataType(ENCODING_FIXED_FLOAT, nullable);
case FLOAT64:
return new FloatDataType(ENCODING_FIXED_DOUBLE, nullable);
case BOOLEAN:
return new BooleanDataType(nullable);
case BYTES:
return new BinaryDataType(nullable, MIN_COMPRESSION);
case ARRAY:
DataType elementType = getDataType(fieldName, fieldSchema.valueSchema());
if (elementType == null || elementType instanceof VarcharDataType) {
LOG.warn("Ignoring Array of Strings field: " + fieldName);
return null; //TODO timebase allows arrays of alphanumeric varchar (length < 10)
}
return new ArrayDataType(nullable, elementType);
case STRUCT:
return new ClassDataType(nullable, buildFieldDescriptor(fieldSchema));
case MAP:
default:
throw new IllegalArgumentException("Unsupported field type " + fieldSchema.type());
}
}