in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java [817:1007]
<T> long insertAll(
TableReference ref,
List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
@Nullable List<String> insertIdList,
BackOff backoff,
FluentBackoff rateLimitBackoffFactory,
final Sleeper sleeper,
InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<T>> failedInserts,
ErrorContainer<T> errorContainer,
boolean skipInvalidRows,
boolean ignoreUnkownValues,
boolean ignoreInsertIds)
throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
this.executor =
new BoundedExecutorService(
options.as(GcsOptions.class).getExecutorService(),
options.as(BigQueryOptions.class).getInsertBundleParallelism());
}
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError(
"If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list.
// If there are failures, they will contain only the failed rows to be retried.
List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowsToPublish = rowList;
List<String> idsToPublish = null;
if (!ignoreInsertIds) {
idsToPublish = insertIdList;
}
while (true) {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : null;
int strideIndex = 0;
// Upload in batches.
List<TableDataInsertAllRequest.Rows> rows = new ArrayList<>();
long dataSize = 0L;
List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
List<Integer> strideIndices = new ArrayList<>();
// Store the longest throttled time across all parallel threads
final AtomicLong maxThrottlingMsec = new AtomicLong();
for (int i = 0; i < rowsToPublish.size(); ++i) {
TableRow row = rowsToPublish.get(i).getValue();
TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
if (idsToPublish != null) {
out.setInsertId(idsToPublish.get(i));
}
out.setJson(row.getUnknownKeys());
rows.add(out);
try {
dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
} catch (Exception ex) {
throw new RuntimeException("Failed to convert the row to JSON", ex);
}
if (dataSize >= maxRowBatchSize
|| rows.size() >= maxRowsPerBatch
|| i == rowsToPublish.size() - 1) {
TableDataInsertAllRequest content = new TableDataInsertAllRequest();
content.setRows(rows);
content.setSkipInvalidRows(skipInvalidRows);
content.setIgnoreUnknownValues(ignoreUnkownValues);
final Bigquery.Tabledata.InsertAll insert =
client
.tabledata()
.insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
.setPrettyPrint(false);
futures.add(
executor.submit(
() -> {
// A backoff for rate limit exceeded errors.
BackOff backoff1 =
BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
long totalBackoffMillis = 0L;
while (true) {
try {
return insert.execute().getInsertErrors();
} catch (IOException e) {
recordError(e);
GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
if (errorInfo == null) {
throw e;
}
/**
* TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
* ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
* GoogleCloudDataproc/hadoop-connectors
*/
if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
&& !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
throw e;
}
LOG.info(
String.format(
"BigQuery insertAll error, retrying: %s",
ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
try {
long nextBackOffMillis = backoff1.nextBackOffMillis();
if (nextBackOffMillis == BackOff.STOP) {
throw e;
}
sleeper.sleep(nextBackOffMillis);
totalBackoffMillis += nextBackOffMillis;
final long totalBackoffMillisSoFar = totalBackoffMillis;
maxThrottlingMsec.getAndUpdate(
current -> Math.max(current, totalBackoffMillisSoFar));
} catch (InterruptedException interrupted) {
throw new IOException(
"Interrupted while waiting before retrying insertAll");
}
}
}
}));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
dataSize = 0L;
strideIndex = i + 1;
rows = new ArrayList<>();
}
}
try {
for (int i = 0; i < futures.size(); i++) {
List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
if (errors == null) {
continue;
}
for (TableDataInsertAllResponse.InsertErrors error : errors) {
if (error.getIndex() == null) {
throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
}
int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
allErrors.add(error);
retryRows.add(rowsToPublish.get(errorIndex));
if (retryIds != null) {
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));
}
}
}
// Accumulate the longest throttled time across all parallel threads
throttlingMsecs.inc(maxThrottlingMsec.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while inserting " + rowsToPublish);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
if (allErrors.isEmpty()) {
break;
}
long nextBackoffMillis = backoff.nextBackOffMillis();
if (nextBackoffMillis == BackOff.STOP) {
break;
}
try {
sleeper.sleep(nextBackoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
}
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
return retTotalDataSize;
}
}