private TableReference executeQueryAndWaitForCompletion()

in ratatool-sampling/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchedBigQueryTableRowIterator.java [386:454]


  private TableReference executeQueryAndWaitForCompletion()
      throws IOException, InterruptedException {
    checkState(projectId != null, "Unable to execute a query without a configured project id");
    checkState(queryConfig != null, "Unable to execute a query without a configured query");
    // Dry run query to get source table location
    Job dryRunJob = new Job()
        .setConfiguration(new JobConfiguration()
            .setQuery(queryConfig)
            .setDryRun(true));
    JobStatistics jobStats = executeWithBackOff(
        client.jobs().insert(projectId, dryRunJob),
        String.format("Error when trying to dry run query %s.",
            queryConfig.toPrettyString())).getStatistics();

    // Let BigQuery to pick default location if the query does not read any tables.
    String location = null;
    @Nullable List<TableReference> tables = jobStats.getQuery().getReferencedTables();
    if (tables != null && !tables.isEmpty()) {
      Table table = getTable(tables.get(0));
      location = table.getLocation();
    }

    // Create a temporary dataset to store results.
    // Starting dataset name with an "_" so that it is hidden.
    Random rnd = new Random(System.currentTimeMillis());
    temporaryDatasetId = "_beam_temporary_dataset_" + rnd.nextInt(1000000);
    temporaryTableId = "beam_temporary_table_" + rnd.nextInt(1000000);

    createDataset(temporaryDatasetId, location);
    Job job = new Job();
    JobConfiguration config = new JobConfiguration();
    config.setQuery(queryConfig);
    job.setConfiguration(config);

    TableReference destinationTable = new TableReference();
    destinationTable.setProjectId(projectId);
    destinationTable.setDatasetId(temporaryDatasetId);
    destinationTable.setTableId(temporaryTableId);
    queryConfig.setDestinationTable(destinationTable);
    queryConfig.setAllowLargeResults(true);

    Job queryJob = executeWithBackOff(
        client.jobs().insert(projectId, job),
        String.format("Error when trying to execute the job for query %s.",
            queryConfig.toPrettyString()));
    JobReference jobId = queryJob.getJobReference();

    while (true) {
      Job pollJob = executeWithBackOff(
          client.jobs().get(projectId, jobId.getJobId()),
          String.format("Error when trying to get status of the job for query %s.",
              queryConfig.toPrettyString()));
      JobStatus status = pollJob.getStatus();
      if (status.getState().equals("DONE")) {
        // Job is DONE, but did not necessarily succeed.
        ErrorProto error = status.getErrorResult();
        if (error == null) {
          return pollJob.getConfiguration().getQuery().getDestinationTable();
        } else {
          // There will be no temporary table to delete, so null out the reference.
          temporaryTableId = null;
          throw new IOException(String.format(
              "Executing query %s failed: %s", queryConfig.toPrettyString(), error.getMessage()));
        }
      }
      Uninterruptibles.sleepUninterruptibly(
          QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
    }
  }