private NexmarkPerf currentPerf()

in sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java [267:367]


  private NexmarkPerf currentPerf(
      long startMsSinceEpoch,
      long now,
      PipelineResult result,
      List<NexmarkPerf.ProgressSnapshot> snapshots,
      Monitor<?> eventMonitor,
      Monitor<?> resultMonitor) {
    NexmarkPerf perf = new NexmarkPerf();

    MetricsReader eventMetrics = new MetricsReader(result, eventMonitor.name);

    long numEvents = eventMetrics.getCounterMetric(eventMonitor.prefix + ".elements");
    long numEventBytes = eventMetrics.getCounterMetric(eventMonitor.prefix + ".bytes");
    long eventStart = eventMetrics.getStartTimeMetric(eventMonitor.prefix + ".processingTime");
    long eventEnd = eventMetrics.getEndTimeMetric(eventMonitor.prefix + ".processingTime");

    MetricsReader resultMetrics = new MetricsReader(result, resultMonitor.name);

    long numResults = resultMetrics.getCounterMetric(resultMonitor.prefix + ".elements");
    long numResultBytes = resultMetrics.getCounterMetric(resultMonitor.prefix + ".bytes");
    long resultStart = resultMetrics.getStartTimeMetric(resultMonitor.prefix + ".processingTime");
    long resultEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + ".processingTime");
    long timestampStart =
        resultMetrics.getStartTimeMetric(resultMonitor.prefix + ".eventTimestamp");
    long timestampEnd = resultMetrics.getEndTimeMetric(resultMonitor.prefix + ".eventTimestamp");

    long effectiveEnd = -1;
    if (eventEnd >= 0 && resultEnd >= 0) {
      // It is possible for events to be generated after the last result was emitted.
      // (Eg Query 2, which only yields results for a small prefix of the event stream.)
      // So use the max of last event and last result times.
      effectiveEnd = Math.max(eventEnd, resultEnd);
    } else if (resultEnd >= 0) {
      effectiveEnd = resultEnd;
    } else if (eventEnd >= 0) {
      // During startup we may have no result yet, but we would still like to track how
      // long the pipeline has been running.
      effectiveEnd = eventEnd;
    }

    if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) {
      perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0;
    }

    if (numEvents >= 0) {
      perf.numEvents = numEvents;
    }

    if (numEvents >= 0 && perf.runtimeSec > 0.0) {
      // For streaming we may later replace this with a 'steady-state' value calculated
      // from the progress snapshots.
      perf.eventsPerSec = numEvents / perf.runtimeSec;
    }

    if (numEventBytes >= 0 && perf.runtimeSec > 0.0) {
      perf.eventBytesPerSec = numEventBytes / perf.runtimeSec;
    }

    if (numResults >= 0) {
      perf.numResults = numResults;
    }

    if (numResults >= 0 && perf.runtimeSec > 0.0) {
      perf.resultsPerSec = numResults / perf.runtimeSec;
    }

    if (numResultBytes >= 0 && perf.runtimeSec > 0.0) {
      perf.resultBytesPerSec = numResultBytes / perf.runtimeSec;
    }

    if (eventStart >= 0) {
      perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0;
    }

    if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) {
      perf.processingDelaySec = (resultStart - eventStart) / 1000.0;
    }

    if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) {
      double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0;
      perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
    }

    if (resultEnd >= 0) {
      // Fill in the shutdown delay assuming the job has now finished.
      perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
    }

    // As soon as available, try to capture cumulative cost at this point too.

    NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
    snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0;
    snapshot.runtimeSec = perf.runtimeSec;
    snapshot.numEvents = numEvents;
    snapshot.numResults = numResults;
    snapshots.add(snapshot);

    captureSteadyState(perf, snapshots);

    return perf;
  }