in src/main/java/com/epam/eco/commons/kafka/serde/jackson/ConsumerRecordJsonSerializer.java [91:130]
public void serialize(
ConsumerRecord value,
JsonGenerator gen,
SerializerProvider serializers) throws IOException {
gen.writeStartObject();
gen.writeStringField(ConsumerRecordFields.TOPIC, value.topic());
gen.writeNumberField(ConsumerRecordFields.PARTITION, value.partition());
gen.writeNumberField(ConsumerRecordFields.OFFSET, value.offset());
gen.writeNumberField(ConsumerRecordFields.TIMESTAMP, value.timestamp());
gen.writeObjectField(ConsumerRecordFields.TIMESTAMP_TYPE, value.timestampType());
gen.writeNumberField(ConsumerRecordFields.SERIALIZED_KEY_SIZE, value.serializedKeySize());
gen.writeNumberField(ConsumerRecordFields.SERIALIZED_VALUE_SIZE, value.serializedValueSize());
JavaType effectiveKeyType = keyType;
if (keyType.isJavaLangObject()) {
if (value.key() != null) {
effectiveKeyType = SimpleType.constructUnsafe(value.key().getClass());
}
}
gen.writeObjectField(ConsumerRecordFields.KEY_CLASS, effectiveKeyType);
BeanProperty keyProp = typeHolderBeanProperty(effectiveKeyType);
JsonSerializer<Object> keySerializer = serializers.findValueSerializer(effectiveKeyType, keyProp);
gen.writeFieldName(ConsumerRecordFields.KEY);
keySerializer.serialize(value.key(), gen, serializers);
JavaType effectiveValueType = valueType;
if (valueType.isJavaLangObject()) {
if (value.value() != null) {
effectiveValueType = SimpleType.constructUnsafe(value.value().getClass());
}
}
gen.writeObjectField(ConsumerRecordFields.VALUE_CLASS, effectiveValueType);
BeanProperty valueProp = typeHolderBeanProperty(effectiveValueType);
JsonSerializer<Object> valueSerializer = serializers.findValueSerializer(effectiveValueType, valueProp);
gen.writeFieldName(ConsumerRecordFields.VALUE);
valueSerializer.serialize(value.value(), gen, serializers);
gen.writeObjectField(ConsumerRecordFields.HEADERS, value.headers());
gen.writeEndObject();
}