in sdks/java/testing/watermarks/src/main/java/org/apache/beam/sdk/testing/watermarks/WatermarkLatency.java [87:233]
static void run(WatermarkLatencyOptions options) {
Pipeline p = Pipeline.create(options);
Duration period = Duration.standardSeconds(1);
final int numKeys = options.getNumKeys();
final String configName = options.getConfigName();
PCollection<KV<Long, Instant>> input =
p.apply("Generate", GenerateSequence.from(0).withRate(options.getInputRatePerSec(), period))
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Instant>>() {
@ProcessElement
public void process(ProcessContext c) {
Instant now = Instant.now();
c.output(KV.of(c.element() % numKeys, now));
}
}))
.apply(
Window.<KV<Long, Instant>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO));
PCollectionList<KV<String, Duration>> latencyList =
PCollectionList.<KV<String, Duration>>empty(p);
for (int i = 0; i < options.getNumShuffles(); i++) {
final int idx = i;
PCollectionTuple tup =
input
.apply(GroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<Long, Iterable<Instant>>, KV<Long, Instant>>() {
@ProcessElement
public void process(ProcessContext c, @Timestamp Instant ts) {
Instant now = Instant.now();
Instant lastGBKTs = Instant.ofEpochMilli(0L);
// forward records to next window
for (Instant v : c.element().getValue()) {
lastGBKTs = v;
// enforce re-shuffling by changing keys
c.output(KV.of(c.element().getKey() + 1, now));
}
if (idx > 0) {
// compute delay since last shuffle and emit result to side output
Duration sessionDelay = new Duration(lastGBKTs, now);
c.output(
latencyResult,
KV.of(
String.format("GBK%d-GBK%d", idx - 1, idx), sessionDelay));
}
}
})
.withOutputTags(output, TupleTagList.of(latencyResult)));
latencyList = latencyList.and(tup.get(latencyResult));
input = tup.get(output);
}
PCollectionList<String> collectionList = PCollectionList.<String>empty(p);
for (PCollection<KV<String, Duration>> latency : latencyList.getAll()) {
collectionList =
collectionList.and(
latency
.apply(
Window.<KV<String, Duration>>into(
FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(GroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<String, Iterable<Duration>>, String>() {
Duration median = null;
Duration p75 = null;
Duration p95 = null;
Duration p99 = null;
int numElements = -1;
@ProcessElement
public void process(ProcessContext c) {
computePercentiles(c.element().getValue());
if (numElements < 0) {
return;
}
String out =
String.format(
"%s, %s, %d, %d, %d, %d, %d",
configName,
c.element().getKey(),
median.getMillis(),
p75.getMillis(),
p95.getMillis(),
p99.getMillis(),
numElements);
LOG.info(out);
}
private void computePercentiles(Iterable<Duration> vals) {
numElements = -1;
ArrayList<Duration> accumulator = new ArrayList<>(6000);
for (Duration v : vals) {
accumulator.add(v);
}
if (accumulator.isEmpty()) {
return;
}
// Compute the median of the available points.
int medianIndex = (int) Math.floor(accumulator.size() * 0.5);
int p75Index = (int) Math.floor(accumulator.size() * 0.75);
int p95Index = (int) Math.floor(accumulator.size() * 0.95);
int p99Index = (int) Math.floor(accumulator.size() * 0.99);
if (medianIndex < 0
|| medianIndex >= accumulator.size()
|| p75Index < 0
|| p75Index >= accumulator.size()
|| p95Index < 0
|| p95Index >= accumulator.size()
|| p99Index < 0
|| p99Index >= accumulator.size()) {
LOG.info("Computed bogus index");
return;
}
Collections.sort(accumulator);
median = accumulator.get(medianIndex);
p75 = accumulator.get(p75Index);
p95 = accumulator.get(p95Index);
p99 = accumulator.get(p99Index);
numElements = accumulator.size();
}
})));
}
// Run pipeline
p.run().waitUntilFinish();
}