public Map call()

in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [403:437]


        public Map<TopicPartition, Long> call() throws Exception {
            int numberOfRecordsInBatch = 0;
            int numberOfRecordsToCommit = 0;
            Map<TopicPartition, Long> offsetsToCommit = null;
            try {
                numberOfRecordsInBatch = iterator.getRecords().count();

                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(
                            "Group [{}]: record batch started. " +
                            "Number of records = {}, smallest offsets = {}, largest offsest = {}.",
                            groupId,
                            numberOfRecordsInBatch,
                            KafkaUtils.extractSmallestOffsets(iterator.getRecords()),
                            KafkaUtils.extractLargestOffsets(iterator.getRecords()));
                }

                handler.accept(iterator);

                numberOfRecordsToCommit = iterator.countRecordsToCommit();
                offsetsToCommit = iterator.buildOffsetsToCommit();

                return offsetsToCommit;
            } finally {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(
                            "Group [{}]: record batch completed. " +
                            "Number of records to commit = {} (of {}), offsets to commit = {}.",
                            groupId,
                            numberOfRecordsToCommit,
                            numberOfRecordsInBatch,
                            offsetsToCommit);
                }
            }
        }