in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageDeserializer.java [193:239]
private Struct buildStruct(Schema parentSchema, Schema schema, Map<String, Object> values) {
Struct struct = new Struct(schema);
List<Field> fields = schema.fields();
for (org.apache.kafka.connect.data.Field field : fields) {
String fieldName = field.name();
// renaming is supported for the top level fields only
if (parentSchema == null) {
fieldName = fieldMap.getSource(field.name());
}
Object fieldValue = values.get(fieldName);
if (fieldValue != null) {
switch (field.schema().type()) {
case STRUCT:
fieldValue = buildStruct(schema, field.schema(), (Map<String, Object>) fieldValue);
break;
case ARRAY:
Object[] arrayValue = (Object[]) fieldValue;
fieldValue = Arrays.asList(arrayValue);
break;
case STRING:
// convert to String in case the value is Character or CharSequence
fieldValue = fieldValue.toString();
break;
case INT64:
if (TIMESTAMP_MS.getLogicalName().equals(field.schema().name()))
fieldValue = Timestamp.toLogical(field.schema(), (Long) fieldValue);
break;
case INT32:
if (TIME_MS.getLogicalName().equals(field.schema().name()))
fieldValue = Time.toLogical(field.schema(), (Integer) fieldValue);
break;
default:
// do nothing
}
struct.put(field.name(), fieldValue);
} else {
if (! field.schema().isOptional()) {
throw new DataException(String.format("Field '%s' is required but no value was set.", field.name()));
}
}
}
return struct;
}