public PCollection expand()

in sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java [176:377]


  public PCollection<Done> expand(PCollection<Event> events) {
    final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;

    return events
        .apply(
            name + ".ShardEvents",
            ParDo.of(
                new DoFn<Event, KV<String, Event>>() {
                  private final Counter lateCounter = Metrics.counter(name, "actuallyLateEvent");
                  private final Counter onTimeCounter = Metrics.counter(name, "onTimeCounter");

                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    if (c.element().hasAnnotation("LATE")) {
                      lateCounter.inc();
                      LOG.debug("Observed late: {}", c.element());
                    } else {
                      onTimeCounter.inc();
                    }
                    int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
                    String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
                    c.output(KV.of(shard, c.element()));
                  }
                }))
        .apply(
            name + ".WindowEvents",
            Window.<KV<String, Event>>into(
                    FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
                .triggering(
                    AfterEach.inOrder(
                        Repeatedly.forever(
                                AfterPane.elementCountAtLeast(configuration.maxLogEvents))
                            .orFinally(AfterWatermark.pastEndOfWindow()),
                        Repeatedly.forever(
                            AfterFirst.of(
                                AfterPane.elementCountAtLeast(configuration.maxLogEvents),
                                AfterProcessingTime.pastFirstElementInPane()
                                    .plusDelayOf(LATE_BATCHING_PERIOD)))))
                .discardingFiredPanes()
                // Use a 1 day allowed lateness so that any forgotten hold will stall the
                // pipeline for that period and be very noticeable.
                .withAllowedLateness(Duration.standardDays(1)))
        .apply(name + ".GroupByKey", GroupByKey.create())
        .apply(
            name + ".CheckForLateEvents",
            ParDo.of(
                new DoFn<KV<String, Iterable<Event>>, KV<String, Iterable<Event>>>() {
                  private final Counter earlyCounter = Metrics.counter(name, "earlyShard");
                  private final Counter onTimeCounter = Metrics.counter(name, "onTimeShard");
                  private final Counter lateCounter = Metrics.counter(name, "lateShard");
                  private final Counter unexpectedLatePaneCounter =
                      Metrics.counter(name, "ERROR_unexpectedLatePane");
                  private final Counter unexpectedOnTimeElementCounter =
                      Metrics.counter(name, "ERROR_unexpectedOnTimeElement");

                  @ProcessElement
                  public void processElement(ProcessContext c, BoundedWindow window) {
                    int numLate = 0;
                    int numOnTime = 0;
                    for (Event event : c.element().getValue()) {
                      if (event.hasAnnotation("LATE")) {
                        numLate++;
                      } else {
                        numOnTime++;
                      }
                    }
                    String shard = c.element().getKey();
                    LOG.debug(
                        String.format(
                            "%s with timestamp %s has %d actually late and %d on-time "
                                + "elements in pane %s for window %s",
                            shard,
                            c.timestamp(),
                            numLate,
                            numOnTime,
                            c.pane(),
                            window.maxTimestamp()));
                    if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
                      if (numLate == 0) {
                        LOG.error("ERROR! No late events in late pane for {}", shard);
                        unexpectedLatePaneCounter.inc();
                      }
                      if (numOnTime > 0) {
                        LOG.error(
                            "ERROR! Have {} on-time events in late pane for {}", numOnTime, shard);
                        unexpectedOnTimeElementCounter.inc();
                      }
                      lateCounter.inc();
                    } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
                      if (numOnTime + numLate < configuration.maxLogEvents) {
                        LOG.error(
                            "ERROR! Only have {} events in early pane for {}",
                            numOnTime + numLate,
                            shard);
                      }
                      earlyCounter.inc();
                    } else {
                      onTimeCounter.inc();
                    }
                    c.output(c.element());
                  }
                }))
        .apply(
            name + ".UploadEvents",
            ParDo.of(
                new DoFn<KV<String, Iterable<Event>>, KV<Void, OutputFile>>() {
                  private final Counter savedFileCounter = Metrics.counter(name, "savedFile");
                  private final Counter writtenRecordsCounter =
                      Metrics.counter(name, "writtenRecords");

                  @ProcessElement
                  public void processElement(ProcessContext c, BoundedWindow window)
                      throws IOException {
                    String shard = c.element().getKey();
                    GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
                    OutputFile outputFile = outputFileFor(window, shard, c.pane());
                    LOG.debug(
                        String.format(
                            "Writing %s with record timestamp %s, window timestamp %s, pane %s",
                            shard, c.timestamp(), window.maxTimestamp(), c.pane()));
                    if (outputFile.filename != null) {
                      LOG.info("Beginning write to '{}'", outputFile.filename);
                      int n = 0;
                      try (OutputStream output =
                          Channels.newOutputStream(
                              openWritableGcsFile(options, outputFile.filename))) {
                        for (Event event : c.element().getValue()) {
                          Event.CODER.encode(event, output, Coder.Context.OUTER);
                          writtenRecordsCounter.inc();
                          if (++n % 10000 == 0) {
                            LOG.info("So far written {} records to '{}'", n, outputFile.filename);
                          }
                        }
                      }
                      LOG.info("Written all {} records to '{}'", n, outputFile.filename);
                    }
                    savedFileCounter.inc();
                    c.output(KV.of(null, outputFile));
                  }
                }))
        // Clear fancy triggering from above.
        .apply(
            name + ".WindowLogFiles",
            Window.<KV<Void, OutputFile>>into(
                    FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
                .triggering(DefaultTrigger.of())
                // We expect no late data here, but we'll assume the worst so we can detect any.
                .withAllowedLateness(Duration.standardDays(1))
                .discardingFiredPanes())
        // this GroupByKey allows to have one file per window
        .apply(name + ".GroupByKey2", GroupByKey.create())
        .apply(
            name + ".Index",
            ParDo.of(
                new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
                  private final Counter unexpectedLateCounter =
                      Metrics.counter(name, "ERROR_unexpectedLate");
                  private final Counter unexpectedEarlyCounter =
                      Metrics.counter(name, "ERROR_unexpectedEarly");
                  private final Counter unexpectedIndexCounter =
                      Metrics.counter(name, "ERROR_unexpectedIndex");
                  private final Counter finalizedCounter = Metrics.counter(name, "indexed");

                  @ProcessElement
                  public void processElement(ProcessContext c, BoundedWindow window)
                      throws IOException {
                    if (c.pane().getTiming() == Timing.LATE) {
                      unexpectedLateCounter.inc();
                      LOG.error("ERROR! Unexpected LATE pane: {}", c.pane());
                    } else if (c.pane().getTiming() == Timing.EARLY) {
                      unexpectedEarlyCounter.inc();
                      LOG.error("ERROR! Unexpected EARLY pane: {}", c.pane());
                    } else if (c.pane().getTiming() == Timing.ON_TIME && c.pane().getIndex() != 0) {
                      unexpectedIndexCounter.inc();
                      LOG.error("ERROR! Unexpected ON_TIME pane index: {}", c.pane());
                    } else {
                      GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
                      LOG.debug(
                          "Index with record timestamp {}, window timestamp {}, pane {}",
                          c.timestamp(),
                          window.maxTimestamp(),
                          c.pane());

                      @Nullable String filename = indexPathFor(window);
                      if (filename != null) {
                        LOG.info("Beginning write to '{}'", filename);
                        int n = 0;
                        try (OutputStream output =
                            Channels.newOutputStream(openWritableGcsFile(options, filename))) {
                          for (OutputFile outputFile : c.element().getValue()) {
                            output.write(outputFile.toString().getBytes(StandardCharsets.UTF_8));
                            n++;
                          }
                        }
                        LOG.info("Written all {} lines to '{}'", n, filename);
                      }
                      c.output(new Done("written for timestamp " + window.maxTimestamp()));
                      finalizedCounter.inc();
                    }
                  }
                }));
  }