in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/grafana/GrafanaServiceImpl.java [153:209]
public List<DataFrame> selectDataFrames(DataQueryRequest<SelectQuery> request) throws ValidationException {
if (request.getTargets().isEmpty()) {
throw new NoTargetsException();
}
List<DataFrame> result = new ObjectArrayList<>();
for (SelectQuery query : request.getTargets()) {
MutableDataFrame mutableDataFrame = new MutableDataFrameImpl("select query");
long step = calculateStep(query, request);
SelectBuilder2 selectBuilder = constructQuery(query.getStream(), request.getRange(), query);
try (MessageSource<InstrumentMessage> messageSource = selectBuilder.executeRaw()) {
Map<String, Column> columns = new HashMap<>();
columns.put("timestamp", new ColumnImpl("timestamp", FieldType.TIME));
Map<String, List<Aggregation>> aggregations = new HashMap<>();
query.getFields().values().stream()
.flatMap(Collection::stream)
.forEach(tbField -> aggregations.put(
tbField.getName(),
tbField.getAggregations().stream()
.map(AggregationInfo::getName)
.map(Aggregations::fromString)
.collect(Collectors.toList())
));
Aggregator aggregator = new MultiAggregator(request.getRange().getFrom().toEpochMilli(),
request.getRange().getTo().toEpochMilli(), step, aggregations);
IntervalEntry entry = new IntervalEntry();
HashSet<String> visited = new HashSet<>();
MutableInt records = new MutableInt(0);
while (aggregator.nextInterval(messageSource, entry)) {
visited.clear();
columns.get("timestamp").values().add(entry.getTimestamp());
visited.add("timestamp");
entry.getValues().keyIterator().forEachRemaining(key -> {
Column column = columns.computeIfAbsent(key, k -> {
Column newColumn = new ColumnImpl(k, FieldType.NUMBER);
for (int i = 0; i < records.get(); i++) {
newColumn.values().add(null);
}
return newColumn;
});
column.values().add(entry.getValues().get(key, Double.NaN));
visited.add(key);
});
columns.keySet().stream()
.filter(key -> !visited.contains(key))
.forEach(key -> columns.get(key).values().add(null));
records.increment();
}
if (!(records.get() == 1 && columns.size() == 1)) {
columns.values().forEach(mutableDataFrame::addColumn);
}
}
result.add(mutableDataFrame);
}
return result;
}