static void run()

in sdks/java/testing/watermarks/src/main/java/org/apache/beam/sdk/testing/watermarks/WatermarkLatency.java [87:233]


  static void run(WatermarkLatencyOptions options) {
    Pipeline p = Pipeline.create(options);

    Duration period = Duration.standardSeconds(1);
    final int numKeys = options.getNumKeys();
    final String configName = options.getConfigName();

    PCollection<KV<Long, Instant>> input =
        p.apply("Generate", GenerateSequence.from(0).withRate(options.getInputRatePerSec(), period))
            .apply(
                ParDo.of(
                    new DoFn<Long, KV<Long, Instant>>() {
                      @ProcessElement
                      public void process(ProcessContext c) {
                        Instant now = Instant.now();
                        c.output(KV.of(c.element() % numKeys, now));
                      }
                    }))
            .apply(
                Window.<KV<Long, Instant>>into(FixedWindows.of(Duration.standardSeconds(1)))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .discardingFiredPanes()
                    .withAllowedLateness(Duration.ZERO));

    PCollectionList<KV<String, Duration>> latencyList =
        PCollectionList.<KV<String, Duration>>empty(p);
    for (int i = 0; i < options.getNumShuffles(); i++) {
      final int idx = i;
      PCollectionTuple tup =
          input
              .apply(GroupByKey.create())
              .apply(
                  ParDo.of(
                          new DoFn<KV<Long, Iterable<Instant>>, KV<Long, Instant>>() {
                            @ProcessElement
                            public void process(ProcessContext c, @Timestamp Instant ts) {
                              Instant now = Instant.now();
                              Instant lastGBKTs = Instant.ofEpochMilli(0L);
                              // forward records to next window
                              for (Instant v : c.element().getValue()) {
                                lastGBKTs = v;
                                // enforce re-shuffling by changing keys
                                c.output(KV.of(c.element().getKey() + 1, now));
                              }
                              if (idx > 0) {
                                // compute delay since last shuffle and emit result to side output
                                Duration sessionDelay = new Duration(lastGBKTs, now);
                                c.output(
                                    latencyResult,
                                    KV.of(
                                        String.format("GBK%d-GBK%d", idx - 1, idx), sessionDelay));
                              }
                            }
                          })
                      .withOutputTags(output, TupleTagList.of(latencyResult)));

      latencyList = latencyList.and(tup.get(latencyResult));
      input = tup.get(output);
    }

    PCollectionList<String> collectionList = PCollectionList.<String>empty(p);
    for (PCollection<KV<String, Duration>> latency : latencyList.getAll()) {
      collectionList =
          collectionList.and(
              latency
                  .apply(
                      Window.<KV<String, Duration>>into(
                              FixedWindows.of(Duration.standardMinutes(1)))
                          .triggering(AfterWatermark.pastEndOfWindow())
                          .discardingFiredPanes()
                          .withAllowedLateness(Duration.ZERO))
                  .apply(GroupByKey.create())
                  .apply(
                      ParDo.of(
                          new DoFn<KV<String, Iterable<Duration>>, String>() {

                            Duration median = null;
                            Duration p75 = null;
                            Duration p95 = null;
                            Duration p99 = null;
                            int numElements = -1;

                            @ProcessElement
                            public void process(ProcessContext c) {

                              computePercentiles(c.element().getValue());

                              if (numElements < 0) {
                                return;
                              }

                              String out =
                                  String.format(
                                      "%s, %s, %d, %d, %d, %d, %d",
                                      configName,
                                      c.element().getKey(),
                                      median.getMillis(),
                                      p75.getMillis(),
                                      p95.getMillis(),
                                      p99.getMillis(),
                                      numElements);
                              LOG.info(out);
                            }

                            private void computePercentiles(Iterable<Duration> vals) {
                              numElements = -1;
                              ArrayList<Duration> accumulator = new ArrayList<>(6000);

                              for (Duration v : vals) {
                                accumulator.add(v);
                              }
                              if (accumulator.isEmpty()) {
                                return;
                              }

                              // Compute the median of the available points.
                              int medianIndex = (int) Math.floor(accumulator.size() * 0.5);
                              int p75Index = (int) Math.floor(accumulator.size() * 0.75);
                              int p95Index = (int) Math.floor(accumulator.size() * 0.95);
                              int p99Index = (int) Math.floor(accumulator.size() * 0.99);

                              if (medianIndex < 0
                                  || medianIndex >= accumulator.size()
                                  || p75Index < 0
                                  || p75Index >= accumulator.size()
                                  || p95Index < 0
                                  || p95Index >= accumulator.size()
                                  || p99Index < 0
                                  || p99Index >= accumulator.size()) {
                                LOG.info("Computed bogus index");
                                return;
                              }

                              Collections.sort(accumulator);

                              median = accumulator.get(medianIndex);
                              p75 = accumulator.get(p75Index);
                              p95 = accumulator.get(p95Index);
                              p99 = accumulator.get(p99Index);
                              numElements = accumulator.size();
                            }
                          })));
    }

    // Run pipeline
    p.run().waitUntilFinish();
  }