in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/grafana/GrafanaServiceImpl.java [147:305]
public DataFrame dataFrame(SelectQuery query, TimeRange range, int maxDataPoints, Long intervalMs) throws ValidationException,
RecordValidationException {
SelectBuilder2 selectBuilder = constructQuery(query, range);
long step = calculateStep(query, range, maxDataPoints, intervalMs);
if (query.getFunctions() == null || query.getFunctions().isEmpty()) {
return new MutableDataFrameImpl(query.getRefId());
}
boolean groupBy = query.getGroupBy() != null && !query.getGroupBy().isEmpty();
Aggregation aggregation = null;
TreeMap<Long, List<Map<String, Object>>> resultMap = new TreeMap<>();
RawMessageDecoder decoder = new RawMessageDecoder();
StringBuilder keyBuilder = new StringBuilder();
Map<String, Field> fields = new HashMap<>();
GroupByViewOption groupByViewOption = StringUtils.isNotEmpty(query.getGroupByView()) ?
GroupByViewOption.valueOf(query.getGroupByView()) : GroupByViewOption.COLUMN;
try (MessageSource<InstrumentMessage> messageSource = selectBuilder.executeRaw()) {
MutableGenericRecord record = new MutableGenericRecordImpl();
boolean first = true;
while (messageSource.next()) {
if (first) {
first = false;
aggregation = functionsService.aggregation(query, messageSource.getMessage().getTimeStampMs(),
range.getTo().toEpochMilli(), step, query.getGroupBy(), query.getSymbols());
}
decoder.decode((RawMessage) messageSource.getMessage(), record);
if (groupBy) {
keyBuilder.setLength(0);
keyBuilder.append("[");
for (SelectQuery.TimebaseField timebaseField : query.getGroupBy()) {
GenericValueInfo value = record.getValue(timebaseField.getName());
keyBuilder.append(value == null ? null : value.charSequenceValue()).append("*");
if (groupByViewOption == GroupByViewOption.ROW) {
fields.put(timebaseField.toString(), new MutableField(timebaseField.toString(), FieldType.STRING));
}
}
keyBuilder.setLength(keyBuilder.length() - 1);
keyBuilder.append("]");
record.setRecordKey(keyBuilder.toString());
}
if (aggregation.add(record)) {
GenericRecord out = aggregation.record();
Map<String, Object> map = new HashMap<>(aggregation.fields().size());
if (!groupBy) {
for (Field field : aggregation.fields()) {
fields.putIfAbsent(field.name(), field);
if (out.getValue(field.name()) != null) {
map.put(field.name(), out.getValue(field.name()).value());
}
}
resultMap.computeIfAbsent(out.timestamp(), key -> {
List<Map<String, Object>> list = new ObjectArrayList<>();
list.add(new HashMap<>());
return list;
}).get(0).putAll(map);
} else {
switch (groupByViewOption) {
case COLUMN:
for (Field field : aggregation.fields()) {
fields.computeIfAbsent(field.name() + record.recordKey(), key -> new ColumnImpl(key, field.type()));
if (out.getValue(field.name()) != null) {
map.put(field.name() + record.recordKey(), out.getValue(field.name()).value());
}
}
resultMap.computeIfAbsent(out.timestamp(), key -> {
List<Map<String, Object>> list = new ObjectArrayList<>();
list.add(new HashMap<>());
return list;
}).get(0).putAll(map);
break;
case ROW: {
for (Field field : aggregation.fields()) {
fields.computeIfAbsent(field.name(), key -> new ColumnImpl(key, field.type()));
if (out.getValue(field.name()) != null) {
map.put(field.name(), out.getValue(field.name()).value());
}
}
for (SelectQuery.TimebaseField timebaseField : query.getGroupBy()) {
map.put(timebaseField.toString(), record.getValue(timebaseField.getName()).value());
}
List<Map<String, Object>> list = resultMap.computeIfAbsent(out.timestamp(),
key -> new ObjectArrayList<>());
list.add(map);
break;
}
default:
throw new UnsupportedOperationException();
}
}
}
}
if (first) {
return new MutableDataFrameImpl(query.getRefId());
}
if (aggregation instanceof GroupByAggregation) {
GroupByAggregation groupByAggregation = (GroupByAggregation) aggregation;
for (Map.Entry<String, GenericRecord> entry : groupByAggregation.calculateLastRecords().entrySet()) {
GenericRecord out = entry.getValue();
if (out != null) {
Map<String, Object> map = new HashMap<>(aggregation.fields().size());
switch (groupByViewOption) {
case COLUMN:
for (Field field : aggregation.fields()) {
fields.computeIfAbsent(field.name() + entry.getKey(), key -> new ColumnImpl(key, field.type()));
if (out.getValue(field.name()) != null) {
map.put(field.name() + entry.getKey(), out.getValue(field.name()).value());
}
}
resultMap.computeIfAbsent(out.timestamp(), key -> {
List<Map<String, Object>> list = new ObjectArrayList<>();
list.add(new HashMap<>());
return list;
}).get(0).putAll(map);
break;
case ROW: {
String[] values = entry.getKey().substring(1, entry.getKey().length() - 1).split("\\*");
for (Field field : aggregation.fields()) {
fields.computeIfAbsent(field.name(), key -> new ColumnImpl(key, field.type()));
if (out.getValue(field.name()) != null) {
map.put(field.name(), out.getValue(field.name()).value());
}
}
int j = 0;
for (SelectQuery.TimebaseField timebaseField : query.getGroupBy()) {
map.put(timebaseField.toString(), values[j++]);
}
List<Map<String, Object>> list = resultMap.computeIfAbsent(out.timestamp(),
key -> new ObjectArrayList<>());
list.add(map);
break;
}
default:
throw new UnsupportedOperationException();
}
}
}
} else {
GenericRecord out = aggregation.calculateLast();
if (out != null) {
Map<String, Object> map = new HashMap<>(aggregation.fields().size());
for (Field field : aggregation.fields()) {
fields.putIfAbsent(field.name(), field);
map.put(field.name(), out.getValue(field.name()).value());
}
resultMap.computeIfAbsent(out.timestamp(), key -> {
List<Map<String, Object>> list = new ObjectArrayList<>();
list.add(new HashMap<>());
return list;
}).get(0).putAll(map);
}
}
}
MapBasedDataFrame dataFrame = new MapBasedDataFrame(query.getRefId(),
fields.values().stream().sorted(Comparator.comparing(Field::name)).collect(Collectors.toList()));
if (resultMap.size() == 1 && resultMap.containsKey(GenericValueInfo.TIMESTAMP_NULL)) {
resultMap.clear();
}
dataFrame.append(Collections.singletonList(resultMap));
return dataFrame;
}