in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [403:437]
public Map<TopicPartition, Long> call() throws Exception {
int numberOfRecordsInBatch = 0;
int numberOfRecordsToCommit = 0;
Map<TopicPartition, Long> offsetsToCommit = null;
try {
numberOfRecordsInBatch = iterator.getRecords().count();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Group [{}]: record batch started. " +
"Number of records = {}, smallest offsets = {}, largest offsest = {}.",
groupId,
numberOfRecordsInBatch,
KafkaUtils.extractSmallestOffsets(iterator.getRecords()),
KafkaUtils.extractLargestOffsets(iterator.getRecords()));
}
handler.accept(iterator);
numberOfRecordsToCommit = iterator.countRecordsToCommit();
offsetsToCommit = iterator.buildOffsetsToCommit();
return offsetsToCommit;
} finally {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Group [{}]: record batch completed. " +
"Number of records to commit = {} (of {}), offsets to commit = {}.",
groupId,
numberOfRecordsToCommit,
numberOfRecordsInBatch,
offsetsToCommit);
}
}
}