in dbeam-core/src/main/java/com/spotify/dbeam/beam/MetricsHelper.java [58:85]
public static Map<String, Long> getMetrics(final PipelineResult result) {
final MetricQueryResults metricQueryResults =
result.metrics().queryMetrics(MetricsFilter.builder().build());
final Map<String, Long> gauges =
StreamSupport.stream(metricQueryResults.getGauges().spliterator(), false)
.collect(
Collectors.groupingBy(
MetricResult::getName,
Collectors.reducing(
GaugeResult.empty(),
GET_COMMITTED_GAUGE,
BinaryOperator.maxBy(Comparator.comparing(GaugeResult::getTimestamp)))))
.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey().getName(), e -> e.getValue().getValue()));
final Map<String, Long> counters =
StreamSupport.stream(metricQueryResults.getCounters().spliterator(), false)
.collect(
Collectors.groupingBy(
m -> m.getName().getName(), Collectors.summingLong(GET_COMMITTED_COUNTER)));
Map<String, Long> ret = new HashMap<>();
ret.putAll(gauges);
ret.putAll(counters);
addCalculatedMetrics(counters, ret);
return Collections.unmodifiableMap(ret);
}