in ratatool-sampling/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchedBigQueryTableRowIterator.java [386:454]
private TableReference executeQueryAndWaitForCompletion()
throws IOException, InterruptedException {
checkState(projectId != null, "Unable to execute a query without a configured project id");
checkState(queryConfig != null, "Unable to execute a query without a configured query");
// Dry run query to get source table location
Job dryRunJob = new Job()
.setConfiguration(new JobConfiguration()
.setQuery(queryConfig)
.setDryRun(true));
JobStatistics jobStats = executeWithBackOff(
client.jobs().insert(projectId, dryRunJob),
String.format("Error when trying to dry run query %s.",
queryConfig.toPrettyString())).getStatistics();
// Let BigQuery to pick default location if the query does not read any tables.
String location = null;
@Nullable List<TableReference> tables = jobStats.getQuery().getReferencedTables();
if (tables != null && !tables.isEmpty()) {
Table table = getTable(tables.get(0));
location = table.getLocation();
}
// Create a temporary dataset to store results.
// Starting dataset name with an "_" so that it is hidden.
Random rnd = new Random(System.currentTimeMillis());
temporaryDatasetId = "_beam_temporary_dataset_" + rnd.nextInt(1000000);
temporaryTableId = "beam_temporary_table_" + rnd.nextInt(1000000);
createDataset(temporaryDatasetId, location);
Job job = new Job();
JobConfiguration config = new JobConfiguration();
config.setQuery(queryConfig);
job.setConfiguration(config);
TableReference destinationTable = new TableReference();
destinationTable.setProjectId(projectId);
destinationTable.setDatasetId(temporaryDatasetId);
destinationTable.setTableId(temporaryTableId);
queryConfig.setDestinationTable(destinationTable);
queryConfig.setAllowLargeResults(true);
Job queryJob = executeWithBackOff(
client.jobs().insert(projectId, job),
String.format("Error when trying to execute the job for query %s.",
queryConfig.toPrettyString()));
JobReference jobId = queryJob.getJobReference();
while (true) {
Job pollJob = executeWithBackOff(
client.jobs().get(projectId, jobId.getJobId()),
String.format("Error when trying to get status of the job for query %s.",
queryConfig.toPrettyString()));
JobStatus status = pollJob.getStatus();
if (status.getState().equals("DONE")) {
// Job is DONE, but did not necessarily succeed.
ErrorProto error = status.getErrorResult();
if (error == null) {
return pollJob.getConfiguration().getQuery().getDestinationTable();
} else {
// There will be no temporary table to delete, so null out the reference.
temporaryTableId = null;
throw new IOException(String.format(
"Executing query %s failed: %s", queryConfig.toPrettyString(), error.getMessage()));
}
}
Uninterruptibles.sleepUninterruptibly(
QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
}
}