private void writeStruct()

in timebase-kafka-connector/src/main/java/deltix/kafka/connect/RawMessageSerializer.java [170:253]


    private void writeStruct(Struct value, SinkRecord record, RecordClassDescriptor descriptor, UnboundEncoder encoder) {

        for (DataField dataField : descriptor.getFields()) {
            if (! encoder.nextField()) {
                throw new RuntimeException("Encoder does not match descriptor at field " + dataField.getName());
            }

            String fieldName = dataField.getName();
            if (record != null) {
                if (tbOffsetField != null && fieldName.equals(tbOffsetField)) {
                    encoder.writeLong(record.kafkaOffset());
                    continue;
                }
                else if (tbKeyField != null && fieldName.equals(tbKeyField)) {
                    // expecting String keys
                    encoder.writeString(String.valueOf(record.key()));
                    continue;
                }
                else if (tbTombstoneField != null && fieldName.equals(tbTombstoneField)) {
                    encoder.writeBoolean(value == null);
                    continue;
                }
            }

            if (value == null) {
                // this is tombstone record
                writeNull(encoder, dataField.getType());
            }
            else {
                String[] sourcePath = fieldMap.getSourcePath(fieldName);
                Field field = getField(value.schema(), sourcePath);
                Object fieldValue = getFieldValue(value, sourcePath);

                if (fieldValue == null) {
                    //LOG.debug("Writing null for " + field.name() + ": " + field.schema().type() + ": " + dataField.getType());
                    encoder.writeNull();
                } else {
                    //LOG.debug("Writing " + field.name() + ": " + field.schema().type() + ": " + dataField.getType());
                    switch (field.schema().type()) {
                        case INT8:
                            encoder.writeInt((Byte) fieldValue);
                            break;
                        case INT16:
                            encoder.writeInt((Short) fieldValue);
                            break;
                        case INT32:
                            encoder.writeInt(toInt32(fieldValue, field.schema()));
                            break;
                        case INT64:
                            encoder.writeLong(toInt64(fieldValue, field.schema()));
                            break;
                        case STRING:
                            encoder.writeString((String) fieldValue);
                            break;
                        case FLOAT32:
                            encoder.writeFloat((Float) fieldValue);
                            break;
                        case FLOAT64:
                            encoder.writeDouble((Double) fieldValue);
                            break;
                        case BOOLEAN:
                            encoder.writeBoolean((Boolean) fieldValue);
                            break;
                        case BYTES:
                            byte[] bytes = (fieldValue instanceof ByteBuffer) ? ((ByteBuffer) fieldValue).array() : (byte[]) fieldValue;
                            encoder.writeBinary(bytes, 0, bytes.length);
                            break;
                        case ARRAY:
                            DataType elementType = ((ArrayDataType) dataField.getType()).getElementDataType();
                            writeArray((List<?>) fieldValue, field.schema().valueSchema(), elementType, encoder);
                            break;
                        case STRUCT:
                            RecordClassDescriptor fieldDescriptor = ((ClassDataType) dataField.getType()).getFixedDescriptor();
                            UnboundEncoder fieldEncoder = encoder.getFieldEncoder(fieldDescriptor);
                            writeStruct((Struct) fieldValue, null, fieldDescriptor, fieldEncoder);
                            break;
                        case MAP:
                        default:
                            throw new IllegalArgumentException("Unsupported field type " + field.schema().type());
                    }
                }
            }
        }
    }