in commons/src/main/java/com/epam/eco/kafkamanager/TopicListSearchCriteria.java [59:108]
public static TopicListSearchCriteria parseTopicCriteria(Map<String, ?> map, KafkaManager kafkaManager) {
Set<SingleClause<String>> topicClauses = new HashSet<>();
Set<SingleClause<Integer>> partitionCountClauses = new HashSet<>();
Set<SingleClause<Integer>> replicationFactorClauses = new HashSet<>();
Set<SingleClause<Integer>> consumerCountClauses = new HashSet<>();
ReplicationState replicationStateClause = ReplicationState.ANY_REPLICATED;
Set<SingleClause<String>> configStringClauses = new HashSet<>();
Set<SingleClause<String>> descriptionClauses = new HashSet<>();
for(String key : map.keySet()) {
if(ifKeyExists(key) && key.indexOf(OPERATION_SEPARATOR) > 0) {
String filterColumn = key.substring(0, key.indexOf(OPERATION_SEPARATOR));
String rawOperation = key.substring(key.indexOf(OPERATION_SEPARATOR) + 1);
Operation filterOperation = Operation.valueOf(rawOperation);
switch (filterColumn) {
case TOPIC_NAME_ATTR -> topicClauses.add(new SingleClause<>((String) map.get(key), filterOperation));
case PARTITION_COUNT_ATTR -> partitionCountClauses.add(
new SingleClause<>(Integer.valueOf((String) map.get(key)), filterOperation));
case REPLICATION_COUNT_ATTR -> replicationFactorClauses.add(
new SingleClause<>(Integer.valueOf((String) map.get(key)), filterOperation));
case CONSUMER_COUNT_ATTR -> consumerCountClauses.add(
new SingleClause<>(Integer.valueOf((String) map.get(key)), filterOperation));
case REPLICATION_STATE_ATTR -> replicationStateClause = ReplicationState.valueOf((String) map.get(key));
case CONFIG_STRING_ATTR -> configStringClauses.add(new SingleClause<>((String) map.get(key), filterOperation));
case DESCRIPTION_ATTR -> descriptionClauses.add(new SingleClause<>((String) map.get(key), filterOperation));
default -> {
}
}
}
}
return new TopicListSearchCriteria(
Set.of(new ClausesWithHandler<>(topicClauses, stringClausesHandler, TopicInfo::getName),
new ClausesWithHandler<>(partitionCountClauses, numericClausesHandler, TopicInfo::getPartitionCount),
new ClausesWithHandler<>(replicationFactorClauses, numericClausesHandler, TopicInfo::getReplicationFactor),
new ClausesWithHandler<Integer,Integer,TopicInfo>(consumerCountClauses, numericClausesHandler,
topicInfo -> kafkaManager.getConsumerGroupsForTopic(
topicInfo.getName()).size()),
new ClausesWithHandler<ReplicationState,TopicInfo,TopicInfo>(Set.of(new SingleClause<>(replicationStateClause, Operation.EQUALS)),
replicationStateClausesHandler, topicInfo -> topicInfo),
new ClausesWithHandler<String,TopicInfo,TopicInfo>(configStringClauses, configMapClausesHandler, topicInfo -> topicInfo),
new ClausesWithHandler<String,String,TopicInfo>(descriptionClauses, stringClausesHandler,
topicInfo -> topicInfo.getMetadata().map(Metadata::getDescription).orElse(null))));
}