in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java [289:346]
public Table getOrCreateTable(
TableReference ref,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
@Nullable TableSchema schema) throws IOException {
// Check if table already exists.
Bigquery.Tables.Get get = client.tables()
.get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
Table table = null;
try {
table = get.execute();
} catch (IOException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if (!errorExtractor.itemNotFound(e) ||
createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
// Rethrow.
throw e;
}
}
// If we want an empty table, and it isn't, then delete it first.
if (table != null) {
if (writeDisposition == WriteDisposition.WRITE_APPEND) {
return table;
}
boolean empty = isEmpty(ref);
if (empty) {
if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
}
return table;
} else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
throw new IOException("WriteDisposition is WRITE_EMPTY, "
+ "but table is not empty");
}
// Reuse the existing schema if none was provided.
if (schema == null) {
schema = table.getSchema();
}
// Delete table and fall through to re-creating it below.
LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
Bigquery.Tables.Delete delete = client.tables()
.delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
delete.execute();
}
if (schema == null) {
throw new IllegalArgumentException(
"Table schema required for new table.");
}
// Create the table.
return tryCreateTable(ref, schema);
}