public List selectDataFrames()

in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/grafana/GrafanaServiceImpl.java [153:209]


    public List<DataFrame> selectDataFrames(DataQueryRequest<SelectQuery> request) throws ValidationException {
        if (request.getTargets().isEmpty()) {
            throw new NoTargetsException();
        }

        List<DataFrame> result = new ObjectArrayList<>();

        for (SelectQuery query : request.getTargets()) {
            MutableDataFrame mutableDataFrame = new MutableDataFrameImpl("select query");
            long step = calculateStep(query, request);
            SelectBuilder2 selectBuilder = constructQuery(query.getStream(), request.getRange(), query);
            try (MessageSource<InstrumentMessage> messageSource = selectBuilder.executeRaw()) {
                Map<String, Column> columns = new HashMap<>();
                columns.put("timestamp", new ColumnImpl("timestamp", FieldType.TIME));
                Map<String, List<Aggregation>> aggregations = new HashMap<>();
                query.getFields().values().stream()
                        .flatMap(Collection::stream)
                        .forEach(tbField -> aggregations.put(
                                tbField.getName(),
                                tbField.getAggregations().stream()
                                        .map(AggregationInfo::getName)
                                        .map(Aggregations::fromString)
                                        .collect(Collectors.toList())
                        ));
                Aggregator aggregator = new MultiAggregator(request.getRange().getFrom().toEpochMilli(),
                        request.getRange().getTo().toEpochMilli(), step, aggregations);
                IntervalEntry entry = new IntervalEntry();
                HashSet<String> visited = new HashSet<>();
                MutableInt records = new MutableInt(0);
                while (aggregator.nextInterval(messageSource, entry)) {
                    visited.clear();
                    columns.get("timestamp").values().add(entry.getTimestamp());
                    visited.add("timestamp");
                    entry.getValues().keyIterator().forEachRemaining(key -> {
                        Column column = columns.computeIfAbsent(key, k -> {
                            Column newColumn = new ColumnImpl(k, FieldType.NUMBER);
                            for (int i = 0; i < records.get(); i++) {
                                newColumn.values().add(null);
                            }
                            return newColumn;
                        });
                        column.values().add(entry.getValues().get(key, Double.NaN));
                        visited.add(key);
                    });
                    columns.keySet().stream()
                            .filter(key -> !visited.contains(key))
                            .forEach(key -> columns.get(key).values().add(null));
                    records.increment();
                }
                if (!(records.get() == 1 && columns.size() == 1)) {
                    columns.values().forEach(mutableDataFrame::addColumn);
                }
            }
            result.add(mutableDataFrame);
        }
        return result;
    }