in src/main/java/com/epam/eco/commons/kafka/consumer/advanced/AdvancedConsumer.java [297:318]
private void commitOffsetsFromHandlerTask(boolean waitForComplete) {
Map<TopicPartition, Long> offsets = getOffsetsFromHandlerTask(waitForComplete);
if (offsets.isEmpty()) {
return;
}
Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = offsets.entrySet().stream().
collect(
Collectors.toMap(
entry -> entry.getKey(),
entry -> new OffsetAndMetadata(entry.getValue())));
LOGGER.debug("Group [{}]: commiting offsets = {}", groupId, offsetsAndMetadata);
try {
consumer.commitSync(offsetsAndMetadata);
LOGGER.debug("Group [{}]: offsets committed successfully", groupId);
} catch (Exception ex) {
LOGGER.warn("Group [{}]: failed to commit offsets. Error = {}", groupId, ex.getMessage());
}
}