in core/src/main/java/com/epam/eco/kafkamanager/core/consumer/repo/kafka/KafkaConsumerGroupCache.java [241:314]
private void applyUpdates(
Map<String, GroupMetadataAdapter> groupUpdates,
Map<String, Map<TopicPartition, OffsetAndMetadataAdapter>> offsetUpdates,
UpdateMode mode) {
if (MapUtils.isEmpty(groupUpdates) && MapUtils.isEmpty(offsetUpdates)) {
return;
}
Map<String, KafkaGroupMetadata> effectiveUpdate = new HashMap<>();
lock.writeLock().lock();
try {
if (!MapUtils.isEmpty(groupUpdates)) {
groupUpdates.forEach((groupName, update) -> {
try {
KafkaGroupMetadata groupMetadata = getGroupMetadata(groupName, update != null);
if (groupMetadata != null) {
groupMetadata.setGroupMetadata(update);
boolean removed = removeGroupMetadataIfInvalid(groupMetadata);
if (removed) {
effectiveUpdate.put(groupMetadata.getName(), null);
} else {
effectiveUpdate.put(groupMetadata.getName(), groupMetadata);
}
}
} catch (Exception ex) {
LOGGER.error(
String.format(
"Failed to apply group metadata update: group=%s, update=%s",
groupName, update),
ex);
}
});
}
if (!MapUtils.isEmpty(offsetUpdates)) {
offsetUpdates.forEach((groupName, update) -> {
try {
KafkaGroupMetadata groupMetadata = getGroupMetadata(groupName, false);
if (groupMetadata == null && isCleanUpUpdate(update)) {
return;
}
if (groupMetadata == null) {
throw new RuntimeException("Group not found");
}
if (mode == UpdateMode.SET) {
groupMetadata.setOffsetsMetadata(update);
setTopicGroups(groupName, update);
setOffsetTimeSeries(groupName, update);
} else if (mode == UpdateMode.UPDATE) {
groupMetadata.updateOffsetsMetadata(update);
updateTopicGroups(groupName, update);
updateOffsetTimeSeries(groupName, update);
} else {
throw new IllegalArgumentException("Unknown update mode " + mode);
}
effectiveUpdate.put(groupMetadata.getName(), groupMetadata);
} catch (Exception ex) {
LOGGER.error(
String.format(
"Failed to apply offset metadata update: group=%s, update=%s",
groupName, update),
ex);
}
});
}
} finally {
lock.writeLock().unlock();
}
fireCacheListener(effectiveUpdate);
}