private PDone createWrite()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java [116:211]


    private <WriteT> PDone createWrite(
        PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
      Pipeline p = input.getPipeline();

      // A coder to user for the WriteOperation.
      @SuppressWarnings("unchecked")
      Coder<WriteOperation<T, WriteT>> operationCoder =
          (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());

      // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
      // the sink.
      PCollection<WriteOperation<T, WriteT>> operationCollection =
          p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));

      // Initialize the resource in a do-once ParDo on the WriteOperation.
      operationCollection = operationCollection
          .apply("Initialize", ParDo.of(
              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
              WriteOperation<T, WriteT> writeOperation = c.element();
              writeOperation.initialize(c.getPipelineOptions());
              // The WriteOperation is also the output of this ParDo, so it can have mutable
              // state.
              c.output(writeOperation);
            }
          }))
          .setCoder(operationCoder);

      // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
      final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
          operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());

      // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
      // as a side input) and collect the results of the writes in a PCollection.
      // There is a dependency between this ParDo and the first (the WriteOperation PCollection
      // as a side input), so this will happen after the initial ParDo.
      PCollection<WriteT> results = input
          .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
            // Writer that will write the records in this bundle. Lazily
            // initialized in processElement.
            private Writer<T, WriteT> writer = null;

            @Override
            public void processElement(ProcessContext c) throws Exception {
              // Lazily initialize the Writer
              if (writer == null) {
                WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
                writer = writeOperation.createWriter(c.getPipelineOptions());
                writer.open(UUID.randomUUID().toString());
              }
              try {
                writer.write(c.element());
              } catch (Exception e) {
                // Discard write result and close the write.
                try {
                  writer.close();
                } catch (Exception closeException) {
                  // Do not mask the exception that caused the write to fail.
                }
                throw e;
              }
            }

            @Override
            public void finishBundle(Context c) throws Exception {
              if (writer != null) {
                WriteT result = writer.close();
                // Output the result of the write.
                c.outputWithTimestamp(result, Instant.now());
              }
            }
          }).withSideInputs(writeOperationView))
          .setCoder(writeOperation.getWriterResultCoder())
          .apply(Window.<WriteT>into(new GlobalWindows()));

      final PCollectionView<Iterable<WriteT>> resultsView =
          results.apply(View.<WriteT>asIterable());

      // Finalize the write in another do-once ParDo on the singleton collection containing the
      // Writer. The results from the per-bundle writes are given as an Iterable side input.
      // The WriteOperation's state is the same as after its initialization in the first do-once
      // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
      // collection as a side input), so it will happen after the parallel write.
      @SuppressWarnings("unused")
      final PCollection<Integer> done = operationCollection
          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
              Iterable<WriteT> results = c.sideInput(resultsView);
              WriteOperation<T, WriteT> writeOperation = c.element();
              writeOperation.finalize(results, c.getPipelineOptions());
            }
          }).withSideInputs(resultsView));
      return PDone.in(input.getPipeline());
    }