public PDone apply()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java [805:864]


      public PDone apply(PCollection<TableRow> input) {
        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);

        if (table == null && tableRefFunction == null) {
          throw new IllegalStateException(
              "must set the table reference of a BigQueryIO.Write transform");
        }
        if (table != null && tableRefFunction != null) {
          throw new IllegalStateException(
              "Cannot set both a table reference and a table function for a BigQueryIO.Write "
                + "transform");
        }

        if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && schema == null) {
          throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, "
              + "however no schema was provided.");
        }

        if (table != null && table.getProjectId() == null) {
          // If user does not specify a project we assume the table to be located in the project
          // that owns the Dataflow job.
          String projectIdFromOptions = options.getProject();
          LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
              table.getTableId(), projectIdFromOptions));
          table.setProjectId(projectIdFromOptions);
        }

        // Check for destination table presence and emptiness for early failure notification.
        // Note that a presence check can fail if the table or dataset are created by earlier stages
        // of the pipeline. For these cases the withoutValidation method can be used to disable
        // the check.
        // Unfortunately we can't validate anything early in case tableRefFunction is specified.
        if (table != null && validate) {
          verifyDatasetPresence(options, table);
          if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
            verifyTablePresence(options, table);
          }
          if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
            verifyTableEmpty(options, table);
          }
        }

        // In streaming, BigQuery write is taken care of by StreamWithDeDup transform.
        // We also currently do this if a tablespec function is specified.
        if (options.isStreaming() || tableRefFunction != null) {
          if (createDisposition == CreateDisposition.CREATE_NEVER) {
            throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
                + "supported for unbounded PCollections or when using tablespec functions.");
          }

          if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
            throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
                + "supported for unbounded PCollections or when using tablespec functions.");
          }

          return input.apply(new StreamWithDeDup(table, tableRefFunction, schema));
        }

        return PDone.in(input.getPipeline());
      }