in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java [805:864]
public PDone apply(PCollection<TableRow> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
if (table == null && tableRefFunction == null) {
throw new IllegalStateException(
"must set the table reference of a BigQueryIO.Write transform");
}
if (table != null && tableRefFunction != null) {
throw new IllegalStateException(
"Cannot set both a table reference and a table function for a BigQueryIO.Write "
+ "transform");
}
if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && schema == null) {
throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, "
+ "however no schema was provided.");
}
if (table != null && table.getProjectId() == null) {
// If user does not specify a project we assume the table to be located in the project
// that owns the Dataflow job.
String projectIdFromOptions = options.getProject();
LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
table.getTableId(), projectIdFromOptions));
table.setProjectId(projectIdFromOptions);
}
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail if the table or dataset are created by earlier stages
// of the pipeline. For these cases the withoutValidation method can be used to disable
// the check.
// Unfortunately we can't validate anything early in case tableRefFunction is specified.
if (table != null && validate) {
verifyDatasetPresence(options, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
verifyTablePresence(options, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
verifyTableEmpty(options, table);
}
}
// In streaming, BigQuery write is taken care of by StreamWithDeDup transform.
// We also currently do this if a tablespec function is specified.
if (options.isStreaming() || tableRefFunction != null) {
if (createDisposition == CreateDisposition.CREATE_NEVER) {
throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
+ "supported for unbounded PCollections or when using tablespec functions.");
}
if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
+ "supported for unbounded PCollections or when using tablespec functions.");
}
return input.apply(new StreamWithDeDup(table, tableRefFunction, schema));
}
return PDone.in(input.getPipeline());
}