in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [301:351]
private void writeArray(List<?> arrayVal, Schema elementSchema, DataType elementType, WritableValue encoder) {
encoder.setArrayLength(arrayVal.size());
for (Object element : arrayVal) {
WritableValue v = encoder.nextWritableElement();
if (element == null) {
v.writeNull();
} else {
switch (elementSchema.type()) {
case INT8:
v.writeInt((Byte) element);
break;
case INT16:
v.writeInt((Short) element);
break;
case INT32:
v.writeInt(toInt32(element, elementSchema));
break;
case INT64:
v.writeLong(toInt64(element, elementSchema));
break;
case FLOAT32:
v.writeFloat((Float) element);
break;
case FLOAT64:
v.writeDouble((Double) element);
break;
case BOOLEAN:
v.writeBoolean((Boolean) element);
break;
case STRING:
v.writeString((String) element);
break;
case BYTES:
byte[] bytes = (element instanceof ByteBuffer) ? ((ByteBuffer) element).array() : (byte[]) element;
v.writeBinary(bytes, 0, bytes.length);
break;
case ARRAY:
DataType nestedElementType = ((ArrayDataType) elementType).getElementDataType();
writeArray((List<?>) element, elementSchema.valueSchema(), nestedElementType, v);
break;
case STRUCT:
RecordClassDescriptor nestedDescriptor = ((ClassDataType) elementType).getFixedDescriptor();
writeStruct((Struct) element, null, nestedDescriptor, v.getFieldEncoder(nestedDescriptor));
break;
case MAP:
default:
throw new IllegalArgumentException("Unsupported array element type: " + elementType);
}
}
}
}