in sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java [267:367]
private NexmarkPerf currentPerf(
long startMsSinceEpoch,
long now,
PipelineResult result,
List<NexmarkPerf.ProgressSnapshot> snapshots,
Monitor<?> eventMonitor,
Monitor<?> resultMonitor) {
NexmarkPerf perf = new NexmarkPerf();
MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);
long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix + ".elements");
long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix + ".bytes");
long eventStart = eventMetrics.getStartTimeMetric(eventMonitor.prefix + ".processingTime");
long eventEnd = eventMetrics.getEndTimeMetric(eventMonitor.prefix + ".processingTime");
MetricsReader resultMetrics = new MetricsReader(result, resultMonitor.name);
long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix + ".elements");
long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix + ".bytes");
long resultStart = resultMetrics.getStartTimeMetric(resultMonitor.prefix + ".processingTime");
long resultEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + ".processingTime");
long timestampStart =
resultMetrics.getStartTimeMetric(resultMonitor.prefix + ".eventTimestamp");
long timestampEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + ".eventTimestamp");
long effectiveEnd = -1;
if (eventEnd >= 0 && resultEnd >= 0) {
// It is possible for events to be generated after the last result was emitted.
// (Eg Query 2, which only yields results for a small prefix of the event stream.)
// So use the max of last event and last result times.
effectiveEnd = Math.max(eventEnd, resultEnd);
} else if (resultEnd >= 0) {
effectiveEnd = resultEnd;
} else if (eventEnd >= 0) {
// During startup we may have no result yet, but we would still like to track how
// long the pipeline has been running.
effectiveEnd = eventEnd;
}
if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
}
if (numEvents >= 0) {
perf.numEvents = numEvents;
}
if (numEvents >= 0 && perf.runtimeSec > 0.0) {
// For streaming we may later replace this with a 'steady-state' value calculated
// from the progress snapshots.
perf.eventsPerSec = numEvents / perf.runtimeSec;
}
if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
}
if (numResults >= 0) {
perf.numResults = numResults;
}
if (numResults >= 0 && perf.runtimeSec > 0.0) {
perf.resultsPerSec = numResults / perf.runtimeSec;
}
if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
}
if (eventStart >= 0) {
perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
}
if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
}
if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
}
if (resultEnd >= 0) {
// Fill in the shutdown delay assuming the job has now finished.
perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
}
// As soon as available, try to capture cumulative cost at this point too.
NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
snapshot.runtimeSec = perf.runtimeSec;
snapshot.numEvents = numEvents;
snapshot.numResults = numResults;
snapshots.add(snapshot);
captureSteadyState(perf, snapshots);
return perf;
}