private void doReset()

in src/main/java/com/epam/eco/commons/kafka/helpers/GroupOffsetResetter.java [146:177]


    private void doReset(Consumer<?, ?> consumer, Map<TopicPartition, Long> offsets) {
        Map<TopicPartition, OffsetRange> offsetRanges = fetchOffsetRanges(offsets.keySet());

        offsets = filterOutUselessAndAdjustOutOfRangeOffsets(offsets, offsetRanges);
        if (offsets.isEmpty()) {
            return;
        }

        consumer.assign(new ArrayList<>(offsets.keySet()));

        Map<TopicPartition, OffsetAndMetadata> offsetsCommitted = Collections.emptyMap();
        try {
            offsetsCommitted = consumer.committed(offsets.keySet());
        } catch (Exception e) {
            // ignore
        }

        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            Long offset = entry.getValue();

            OffsetAndMetadata meta = offsetsCommitted.get(partition);
            if (meta != null) {
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
            }
        }

        if (!offsetsToCommit.isEmpty()) {
            consumer.commitSync(offsetsToCommit);
        }
    }