in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageDeserializer.java [279:340]
private Schema getPrimitiveTypeSchema(DataType type) {
assert type.isPrimitive() : "Not primitive type";
Schema.Type schemaType;
Object defaultValue = null;
LogicalType logicalType = null;
if (type instanceof IntegerDataType) {
int size = ((IntegerDataType) type).getNativeTypeSize();
if (size >= 6) {
schemaType = Schema.Type.INT64;
//defaultValue = 0L;
} else if (size == 1) {
schemaType = Schema.Type.INT8;
//defaultValue = (byte) 0;
} else if (size == 2) {
schemaType = Schema.Type.INT16;
//defaultValue = (short) 0;
} else {
schemaType = Schema.Type.INT32;
//defaultValue = 0;
}
} else if (type instanceof FloatDataType) {
if (((FloatDataType) type).isFloat()) {
schemaType = Schema.Type.FLOAT32;
//defaultValue = 0.0F;
} else {
schemaType = Schema.Type.FLOAT64;
//defaultValue = 0.0D;
}
} else if (type instanceof CharDataType ||
type instanceof EnumDataType ||
type instanceof VarcharDataType) {
schemaType = Schema.Type.STRING;
} else if (type instanceof BooleanDataType) {
schemaType = Schema.Type.BOOLEAN;
//defaultValue = false;
} else if (type instanceof DateTimeDataType) {
schemaType = Schema.Type.INT64;
logicalType = TIMESTAMP_MS;
//defaultValue = 0L;
} else if (type instanceof TimeOfDayDataType) {
schemaType = Schema.Type.INT32;
logicalType = TIME_MS;
//defaultValue = 0;
} else if (type instanceof BinaryDataType) {
schemaType = Schema.Type.BYTES;
} else {
throw new RuntimeException("Unrecognized dataType: " + type);
}
SchemaBuilder schemaBuilder = SchemaBuilder.type(schemaType);
if (type.isNullable()) {
schemaBuilder.optional().defaultValue(defaultValue);
}
if (logicalType != null) {
schemaBuilder.name(logicalType.getLogicalName());
}
return schemaBuilder.build();
}