public DataFrame dataFrame()

in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/grafana/GrafanaServiceNewImpl.java [129:284]


    public DataFrame dataFrame(SelectQuery query, TimeRange range, int maxDataPoints, Long intervalMs) throws ValidationException,
            RecordValidationException {
        SelectBuilder2 selectBuilder = constructQuery(query, range);
        long step = calculateStep(query, range, maxDataPoints, intervalMs);
        if (query.getFunctions() == null || query.getFunctions().isEmpty()) {
            return new MutableDataFrameImpl(query.getRefId());
        }
        boolean groupBy = query.getGroupBy() != null && !query.getGroupBy().isEmpty();
        Aggregation aggregation = null;
        TreeMap<Long, List<Map<String, Object>>> resultMap = new TreeMap<>();
        RawMessageDecoder decoder = new RawMessageDecoder();
        StringBuilder keyBuilder = new StringBuilder();
        Map<String, Field> fields = new HashMap<>();
        GroupByViewOption groupByViewOption = StringUtil.isNotEmpty(query.getGroupByView()) ?
                GroupByViewOption.valueOf(query.getGroupByView()) : GroupByViewOption.COLUMN;
        try (MessageSource<InstrumentMessage> messageSource = selectBuilder.executeRaw()) {
            MutableGenericRecord record = new MutableGenericRecordImpl();
            boolean first = true;
            while (messageSource.next()) {
                if (first) {
                    first = false;
                    aggregation = functionsService.aggregation(query, messageSource.getMessage().getTimeStampMs(),
                            range.getTo().toEpochMilli(), step, query.getGroupBy(), query.getSymbols());
                }
                decoder.decode((RawMessage) messageSource.getMessage(), record);
                if (groupBy) {
                    keyBuilder.setLength(0);
                    keyBuilder.append("[");
                    for (SelectQuery.TimebaseField timebaseField : query.getGroupBy()) {
                        GenericValueInfo value = record.getValue(timebaseField.getName());
                        keyBuilder.append(value == null ? null : value.charSequenceValue()).append("*");
                        if (groupByViewOption == GroupByViewOption.ROW) {
                            fields.put(timebaseField.toString(), new MutableField(timebaseField.toString(), FieldType.STRING));
                        }
                    }
                    keyBuilder.setLength(keyBuilder.length() - 1);
                    keyBuilder.append("]");
                    record.setRecordKey(keyBuilder.toString());
                }
                if (aggregation.add(record)) {
                    GenericRecord out = aggregation.record();
                    Map<String, Object> map = new HashMap<>(aggregation.fields().size());
                    if (!groupBy) {
                        for (Field field : aggregation.fields()) {
                            fields.putIfAbsent(field.name(), field);
                            if (out.getValue(field.name()) != null) {
                                map.put(field.name(), out.getValue(field.name()).value());
                            }
                        }
                        resultMap.computeIfAbsent(out.timestamp(), key -> {
                            List<Map<String, Object>> list = new ObjectArrayList<>();
                            list.add(new HashMap<>());
                            return list;
                        }).get(0).putAll(map);
                    } else {
                        switch (groupByViewOption) {
                            case COLUMN:
                                for (Field field : aggregation.fields()) {
                                    fields.computeIfAbsent(field.name() + record.recordKey(), key -> new ColumnImpl(key, field.type()));
                                    if (out.getValue(field.name()) != null) {
                                        map.put(field.name() + record.recordKey(), out.getValue(field.name()).value());
                                    }
                                }
                                resultMap.computeIfAbsent(out.timestamp(), key -> {
                                    List<Map<String, Object>> list = new ObjectArrayList<>();
                                    list.add(new HashMap<>());
                                    return list;
                                }).get(0).putAll(map);
                                break;
                            case ROW: {
                                for (Field field : aggregation.fields()) {
                                    fields.computeIfAbsent(field.name(), key -> new ColumnImpl(key, field.type()));
                                    if (out.getValue(field.name()) != null) {
                                        map.put(field.name(), out.getValue(field.name()).value());
                                    }
                                }
                                for (SelectQuery.TimebaseField timebaseField : query.getGroupBy()) {
                                    map.put(timebaseField.toString(), record.getValue(timebaseField.getName()).value());
                                }
                                List<Map<String, Object>> list = resultMap.computeIfAbsent(out.timestamp(),
                                        key -> new ObjectArrayList<>());
                                list.add(map);
                                break;
                            }
                            default:
                                throw new UnsupportedOperationException();
                        }
                    }
                }
            }
            if (first) {
                return new MutableDataFrameImpl(query.getRefId());
            }
            if (aggregation instanceof GroupByAggregation) {
                GroupByAggregation groupByAggregation = (GroupByAggregation) aggregation;
                for (Map.Entry<String, GenericRecord> entry : groupByAggregation.calculateLastRecords().entrySet()) {
                    GenericRecord out = entry.getValue();
                    if (out != null) {
                        Map<String, Object> map = new HashMap<>(aggregation.fields().size());
                        switch (groupByViewOption) {
                            case COLUMN:
                                for (Field field : aggregation.fields()) {
                                    fields.computeIfAbsent(field.name() + entry.getKey(), key -> new ColumnImpl(key, field.type()));
                                    if (out.getValue(field.name()) != null) {
                                        map.put(field.name() + entry.getKey(), out.getValue(field.name()).value());
                                    }
                                }
                                resultMap.computeIfAbsent(out.timestamp(), key -> {
                                    List<Map<String, Object>> list = new ObjectArrayList<>();
                                    list.add(new HashMap<>());
                                    return list;
                                }).get(0).putAll(map);
                                break;
                            case ROW: {
                                String[] values = entry.getKey().substring(1, entry.getKey().length() - 1).split("\\*");
                                for (Field field : aggregation.fields()) {
                                    fields.computeIfAbsent(field.name(), key -> new ColumnImpl(key, field.type()));
                                    if (out.getValue(field.name()) != null) {
                                        map.put(field.name(), out.getValue(field.name()).value());
                                    }
                                }
                                int j = 0;
                                for (SelectQuery.TimebaseField timebaseField : query.getGroupBy()) {
                                    map.put(timebaseField.toString(), values[j++]);
                                }
                                List<Map<String, Object>> list = resultMap.computeIfAbsent(out.timestamp(),
                                        key -> new ObjectArrayList<>());
                                list.add(map);
                                break;
                            }
                            default:
                                throw new UnsupportedOperationException();
                        }
                    }
                }
            } else {
                GenericRecord out = aggregation.calculateLast();
                if (out != null) {
                    Map<String, Object> map = new HashMap<>(aggregation.fields().size());
                    for (Field field : aggregation.fields()) {
                        fields.putIfAbsent(field.name(), field);
                        map.put(field.name(), out.getValue(field.name()).value());
                    }
                    resultMap.computeIfAbsent(out.timestamp(), key -> {
                        List<Map<String, Object>> list = new ObjectArrayList<>();
                        list.add(new HashMap<>());
                        return list;
                    }).get(0).putAll(map);
                }
            }
        }
        MapBasedDataFrame dataFrame = new MapBasedDataFrame(query.getRefId(),
                fields.values().stream().sorted(Comparator.comparing(Field::name)).collect(Collectors.toList()));
        dataFrame.append(Collections.singletonList(resultMap));
        return dataFrame;
    }