in ratatool-sampling/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchedBigQueryServicesImpl.java [222:254]
static void startJob(
Job job,
ApiErrorExtractor errorExtractor,
Bigquery client,
Sleeper sleeper,
BackOff backoff) throws IOException, InterruptedException {
JobReference jobRef = job.getJobReference();
Exception lastException = null;
do {
try {
client.jobs().insert(jobRef.getProjectId(), job).execute();
LOG.info("Started BigQuery job: {}.\n{}", jobRef,
formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId()));
return; // SUCCEEDED
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemAlreadyExists(e)) {
return; // SUCCEEDED
}
// ignore and retry
LOG.info("Ignore the error and retry inserting the job.", e);
lastException = e;
} catch (IOException e) {
// ignore and retry
LOG.info("Ignore the error and retry inserting the job.", e);
lastException = e;
}
} while (nextBackOff(sleeper, backoff));
throw new IOException(
String.format(
"Unable to insert job: %s, aborting after %d .",
jobRef.getJobId(), MAX_RPC_RETRIES),
lastException);
}