private void applyUpdates()

in core/src/main/java/com/epam/eco/kafkamanager/core/consumer/repo/kafka/KafkaConsumerGroupCache.java [241:314]


    private void applyUpdates(
            Map<String, GroupMetadataAdapter> groupUpdates,
            Map<String, Map<TopicPartition, OffsetAndMetadataAdapter>> offsetUpdates,
            UpdateMode mode) {
        if (MapUtils.isEmpty(groupUpdates) && MapUtils.isEmpty(offsetUpdates)) {
            return;
        }

        Map<String, KafkaGroupMetadata> effectiveUpdate = new HashMap<>();

        lock.writeLock().lock();
        try {
            if (!MapUtils.isEmpty(groupUpdates)) {
                groupUpdates.forEach((groupName, update) -> {
                    try {
                        KafkaGroupMetadata groupMetadata = getGroupMetadata(groupName, update != null);
                        if (groupMetadata != null) {
                            groupMetadata.setGroupMetadata(update);

                            boolean removed = removeGroupMetadataIfInvalid(groupMetadata);
                            if (removed) {
                                effectiveUpdate.put(groupMetadata.getName(), null);
                            } else {
                                effectiveUpdate.put(groupMetadata.getName(), groupMetadata);
                            }
                        }
                    } catch (Exception ex) {
                        LOGGER.error(
                                String.format(
                                        "Failed to apply group metadata update: group=%s, update=%s",
                                        groupName, update),
                                ex);
                    }
                });
            }
            if (!MapUtils.isEmpty(offsetUpdates)) {
                offsetUpdates.forEach((groupName, update) -> {
                    try {
                        KafkaGroupMetadata groupMetadata = getGroupMetadata(groupName, false);
                        if (groupMetadata == null && isCleanUpUpdate(update)) {
                            return;
                        }
                        if (groupMetadata == null) {
                            throw new RuntimeException("Group not found");
                        }

                        if (mode == UpdateMode.SET) {
                            groupMetadata.setOffsetsMetadata(update);
                            setTopicGroups(groupName, update);
                            setOffsetTimeSeries(groupName, update);
                        } else if (mode == UpdateMode.UPDATE) {
                            groupMetadata.updateOffsetsMetadata(update);
                            updateTopicGroups(groupName, update);
                            updateOffsetTimeSeries(groupName, update);
                        } else {
                            throw new IllegalArgumentException("Unknown update mode " + mode);
                        }

                        effectiveUpdate.put(groupMetadata.getName(), groupMetadata);
                    } catch (Exception ex) {
                        LOGGER.error(
                                String.format(
                                        "Failed to apply offset metadata update: group=%s, update=%s",
                                        groupName, update),
                                ex);
                    }
                });
            }
        } finally {
            lock.writeLock().unlock();
        }

        fireCacheListener(effectiveUpdate);
    }