private Schema getPrimitiveTypeSchema()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageDeserializer.java [279:340]


    private Schema getPrimitiveTypeSchema(DataType type) {
        assert type.isPrimitive() : "Not primitive type";

        Schema.Type schemaType;
        Object defaultValue = null;
        LogicalType logicalType = null;

        if (type instanceof IntegerDataType) {
            int size = ((IntegerDataType) type).getNativeTypeSize();
            if (size >= 6) {
                schemaType = Schema.Type.INT64;
                //defaultValue = 0L;
            } else if (size == 1) {
                schemaType = Schema.Type.INT8;
                //defaultValue = (byte) 0;
            } else if (size == 2) {
                schemaType = Schema.Type.INT16;
                //defaultValue = (short) 0;
            } else {
                schemaType = Schema.Type.INT32;
                //defaultValue = 0;
            }
        } else if (type instanceof FloatDataType) {
            if (((FloatDataType) type).isFloat()) {
                schemaType = Schema.Type.FLOAT32;
                //defaultValue = 0.0F;
            } else {
                schemaType = Schema.Type.FLOAT64;
                //defaultValue = 0.0D;
            }
        } else if (type instanceof CharDataType ||
                type instanceof EnumDataType ||
                type instanceof VarcharDataType) {
            schemaType = Schema.Type.STRING;
        } else if (type instanceof BooleanDataType) {
            schemaType = Schema.Type.BOOLEAN;
            //defaultValue = false;
        } else if (type instanceof DateTimeDataType) {
            schemaType = Schema.Type.INT64;
            logicalType = TIMESTAMP_MS;
            //defaultValue = 0L;
        } else if (type instanceof TimeOfDayDataType) {
            schemaType = Schema.Type.INT32;
            logicalType = TIME_MS;
            //defaultValue = 0;
        } else if (type instanceof BinaryDataType) {
            schemaType = Schema.Type.BYTES;
        } else {
            throw new RuntimeException("Unrecognized dataType: " + type);
        }

        SchemaBuilder schemaBuilder = SchemaBuilder.type(schemaType);
        if (type.isNullable()) {
            schemaBuilder.optional().defaultValue(defaultValue);
        }

        if (logicalType != null) {
            schemaBuilder.name(logicalType.getLogicalName());
        }

        return schemaBuilder.build();
    }