in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [170:253]
private void writeStruct(Struct value, SinkRecord record, RecordClassDescriptor descriptor, UnboundEncoder encoder) {
for (DataField dataField : descriptor.getFields()) {
if (! encoder.nextField()) {
throw new RuntimeException("Encoder does not match descriptor at field " + dataField.getName());
}
String fieldName = dataField.getName();
if (record != null) {
if (tbOffsetField != null && fieldName.equals(tbOffsetField)) {
encoder.writeLong(record.kafkaOffset());
continue;
}
else if (tbKeyField != null && fieldName.equals(tbKeyField)) {
// expecting String keys
encoder.writeString(String.valueOf(record.key()));
continue;
}
else if (tbTombstoneField != null && fieldName.equals(tbTombstoneField)) {
encoder.writeBoolean(value == null);
continue;
}
}
if (value == null) {
// this is tombstone record
writeNull(encoder, dataField.getType());
}
else {
String[] sourcePath = fieldMap.getSourcePath(fieldName);
Field field = getField(value.schema(), sourcePath);
Object fieldValue = getFieldValue(value, sourcePath);
if (fieldValue == null) {
//LOG.debug("Writing null for " + field.name() + ": " + field.schema().type() + ": " + dataField.getType());
encoder.writeNull();
} else {
//LOG.debug("Writing " + field.name() + ": " + field.schema().type() + ": " + dataField.getType());
switch (field.schema().type()) {
case INT8:
encoder.writeInt((Byte) fieldValue);
break;
case INT16:
encoder.writeInt((Short) fieldValue);
break;
case INT32:
encoder.writeInt(toInt32(fieldValue, field.schema()));
break;
case INT64:
encoder.writeLong(toInt64(fieldValue, field.schema()));
break;
case STRING:
encoder.writeString((String) fieldValue);
break;
case FLOAT32:
encoder.writeFloat((Float) fieldValue);
break;
case FLOAT64:
encoder.writeDouble((Double) fieldValue);
break;
case BOOLEAN:
encoder.writeBoolean((Boolean) fieldValue);
break;
case BYTES:
byte[] bytes = (fieldValue instanceof ByteBuffer) ? ((ByteBuffer) fieldValue).array() : (byte[]) fieldValue;
encoder.writeBinary(bytes, 0, bytes.length);
break;
case ARRAY:
DataType elementType = ((ArrayDataType) dataField.getType()).getElementDataType();
writeArray((List<?>) fieldValue, field.schema().valueSchema(), elementType, encoder);
break;
case STRUCT:
RecordClassDescriptor fieldDescriptor = ((ClassDataType) dataField.getType()).getFixedDescriptor();
UnboundEncoder fieldEncoder = encoder.getFieldEncoder(fieldDescriptor);
writeStruct((Struct) fieldValue, null, fieldDescriptor, fieldEncoder);
break;
case MAP:
default:
throw new IllegalArgumentException("Unsupported field type " + field.schema().type());
}
}
}
}
}