in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/grafana/GrafanaServiceImpl.java [394:429]
private List<TimeSeriesEntry> select(String streamKey, TimeRange range, long step, SelectQuery query)
throws NoSuchStreamException, NoSuchSymbolsException, SelectBuilder2.NoSuchFieldException, WrongTypeException, NoSuchTypeException {
SelectBuilder2 selectBuilder = constructQuery(streamKey, range, query);
try (MessageSource<InstrumentMessage> messageSource = selectBuilder.executeRaw()) {
Map<String, TimeSeriesEntry> entries = new HashMap<>();
Map<String, List<Aggregation>> aggregations = new HashMap<>();
query.getFields().values().stream()
.flatMap(Collection::stream)
.forEach(tbField -> aggregations.put(
tbField.getName(),
tbField.getAggregations().stream()
.map(AggregationInfo::getName)
.map(Aggregations::fromString)
.collect(Collectors.toList())
));
Aggregator aggregator = new MultiAggregator(range.getFrom().toEpochMilli(), range.getTo().toEpochMilli(), step, aggregations);
IntervalEntry entry = new IntervalEntry();
HashSet<String> visited = new HashSet<>();
while (aggregator.nextInterval(messageSource, entry)) {
visited.clear();
entry.getValues().keyIterator().forEachRemaining(key -> {
entries.computeIfAbsent(key, k -> new TimeSeriesEntry(key)).datapoints
.add(new Number[]{entry.getValues().get(key, Double.NaN), entry.getTimestamp()});
visited.add(key);
});
entries.keySet().stream()
.filter(key -> !visited.contains(key))
.forEach(key -> entries.get(key).datapoints.add(new Number[]{null, entry.getTimestamp()}));
}
entries.forEach((key, value) -> {
LOG.info().append(key).append(": ").append(value.datapoints.size()).commit();
});
return new ArrayList<>(entries.values());
}
}