private void commitOffsetsFromHandlerTask()

in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [297:318]


    private void commitOffsetsFromHandlerTask(boolean waitForComplete) {
        Map<TopicPartition, Long> offsets = getOffsetsFromHandlerTask(waitForComplete);
        if (offsets.isEmpty()) {
            return;
        }

        Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = offsets.entrySet().stream().
                collect(
                        Collectors.toMap(
                                entry -> entry.getKey(),
                                entry -> new OffsetAndMetadata(entry.getValue())));

        LOGGER.debug("Group [{}]: commiting offsets = {}", groupId, offsetsAndMetadata);

        try {
            consumer.commitSync(offsetsAndMetadata);

            LOGGER.debug("Group [{}]: offsets committed successfully", groupId);
        } catch (Exception ex) {
            LOGGER.warn("Group [{}]: failed to commit offsets. Error = {}", groupId, ex.getMessage());
        }
    }