in src/main/java/com/epam/eco/commons/kafka/serde/jackson/ConsumerRecordJsonDeserializer.java [102:220]
public ConsumerRecord deserialize(JsonParser jsonParser, DeserializationContext ctxt) throws IOException {
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
jsonParser.nextToken();
}
String fieldName = jsonParser.getCurrentName();
String topic = null;
Integer partition = null;
Long offset = null;
Long timestamp = ConsumerRecord.NO_TIMESTAMP;
TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
Integer serializedKeySize = ConsumerRecord.NULL_SIZE;
Integer serializedValueSize = ConsumerRecord.NULL_SIZE;
JavaType keyClass = JAVA_OBJECT_TYPE;
TreeNode keyNode = null;
Object key = null;
JavaType valueClass = JAVA_OBJECT_TYPE;
TreeNode valueNode = null;
Object value = null;
Headers headers = new RecordHeaders();
while (fieldName != null) {
switch (fieldName) {
case ConsumerRecordFields.TOPIC:
jsonParser.nextToken();
topic = _parseString(jsonParser, ctxt);
break;
case ConsumerRecordFields.PARTITION:
jsonParser.nextToken();
partition = _parseIntPrimitive(jsonParser, ctxt);
break;
case ConsumerRecordFields.OFFSET:
jsonParser.nextToken();
offset = _parseLongPrimitive(jsonParser, ctxt);
break;
case ConsumerRecordFields.TIMESTAMP:
jsonParser.nextToken();
timestamp = _parseLongPrimitive(jsonParser, ctxt);
break;
case ConsumerRecordFields.TIMESTAMP_TYPE:
jsonParser.nextToken();
timestampType = jsonParser.readValueAs(TimestampType.class);
break;
case ConsumerRecordFields.SERIALIZED_KEY_SIZE:
jsonParser.nextToken();
serializedKeySize = _parseIntPrimitive(jsonParser, ctxt);
break;
case ConsumerRecordFields.SERIALIZED_VALUE_SIZE:
jsonParser.nextToken();
serializedValueSize = _parseIntPrimitive(jsonParser, ctxt);
break;
case ConsumerRecordFields.KEY_CLASS:
jsonParser.nextToken();
if (jsonParser.getCurrentToken() != JsonToken.VALUE_NULL) {
keyClass = jsonParser.readValueAs(JavaType.class);
}
break;
case ConsumerRecordFields.KEY:
jsonParser.nextToken();
keyNode = jsonParser.getCodec().readTree(jsonParser);
break;
case ConsumerRecordFields.VALUE_CLASS:
jsonParser.nextToken();
if (jsonParser.getCurrentToken() != JsonToken.VALUE_NULL) {
valueClass = jsonParser.readValueAs(JavaType.class);
}
break;
case ConsumerRecordFields.VALUE:
jsonParser.nextToken();
valueNode = jsonParser.getCodec().readTree(jsonParser);
break;
case ConsumerRecordFields.HEADERS:
jsonParser.nextToken();
headers = jsonParser.readValueAs(Headers.class);
break;
default:
handleUnknownProperty(jsonParser, ctxt, _valueClass, fieldName);
break;
}
fieldName = jsonParser.nextFieldName();
}
com.epam.eco.commons.json.JsonDeserializerUtils.assertNotNullValue(
topic, ConsumerRecordFields.TOPIC, _valueClass, ctxt);
com.epam.eco.commons.json.JsonDeserializerUtils.assertRequiredField(
partition, ConsumerRecordFields.PARTITION, _valueClass, ctxt);
com.epam.eco.commons.json.JsonDeserializerUtils.assertRequiredField(
offset, ConsumerRecordFields.OFFSET, _valueClass, ctxt);
ObjectCodec codec = jsonParser.getCodec();
if (keyNode != null) {
JavaType targetType = keyType;
if (keyType.isJavaLangObject()) {
targetType = keyClass;
}
key = codec.readValue(keyNode.traverse(codec), targetType);
}
if (valueNode != null) {
JavaType targetType = valueType;
if (valueType.isJavaLangObject()) {
targetType = valueClass;
}
value = codec.readValue(valueNode.traverse(codec), targetType);
}
return new ConsumerRecord<>(
topic,
partition,
offset,
timestamp,
timestampType,
serializedKeySize,
serializedValueSize,
key,
value,
headers,
Optional.empty());
}