in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java [116:211]
private <WriteT> PDone createWrite(
PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
Pipeline p = input.getPipeline();
// A coder to user for the WriteOperation.
@SuppressWarnings("unchecked")
Coder<WriteOperation<T, WriteT>> operationCoder =
(Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
// A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
// the sink.
PCollection<WriteOperation<T, WriteT>> operationCollection =
p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));
// Initialize the resource in a do-once ParDo on the WriteOperation.
operationCollection = operationCollection
.apply("Initialize", ParDo.of(
new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
WriteOperation<T, WriteT> writeOperation = c.element();
writeOperation.initialize(c.getPipelineOptions());
// The WriteOperation is also the output of this ParDo, so it can have mutable
// state.
c.output(writeOperation);
}
}))
.setCoder(operationCoder);
// Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
// Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
// as a side input) and collect the results of the writes in a PCollection.
// There is a dependency between this ParDo and the first (the WriteOperation PCollection
// as a side input), so this will happen after the initial ParDo.
PCollection<WriteT> results = input
.apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
// Writer that will write the records in this bundle. Lazily
// initialized in processElement.
private Writer<T, WriteT> writer = null;
@Override
public void processElement(ProcessContext c) throws Exception {
// Lazily initialize the Writer
if (writer == null) {
WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
writer = writeOperation.createWriter(c.getPipelineOptions());
writer.open(UUID.randomUUID().toString());
}
try {
writer.write(c.element());
} catch (Exception e) {
// Discard write result and close the write.
try {
writer.close();
} catch (Exception closeException) {
// Do not mask the exception that caused the write to fail.
}
throw e;
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (writer != null) {
WriteT result = writer.close();
// Output the result of the write.
c.outputWithTimestamp(result, Instant.now());
}
}
}).withSideInputs(writeOperationView))
.setCoder(writeOperation.getWriterResultCoder())
.apply(Window.<WriteT>into(new GlobalWindows()));
final PCollectionView<Iterable<WriteT>> resultsView =
results.apply(View.<WriteT>asIterable());
// Finalize the write in another do-once ParDo on the singleton collection containing the
// Writer. The results from the per-bundle writes are given as an Iterable side input.
// The WriteOperation's state is the same as after its initialization in the first do-once
// ParDo. There is a dependency between this ParDo and the parallel write (the writer results
// collection as a side input), so it will happen after the parallel write.
@SuppressWarnings("unused")
final PCollection<Integer> done = operationCollection
.apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
Iterable<WriteT> results = c.sideInput(resultsView);
WriteOperation<T, WriteT> writeOperation = c.element();
writeOperation.finalize(results, c.getPipelineOptions());
}
}).withSideInputs(resultsView));
return PDone.in(input.getPipeline());
}