in ratatool-sampling/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchedBigQueryServicesImpl.java [661:802]
long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> rowList,
@Nullable List<String> insertIdList,
BackOff backoff, final Sleeper sleeper, InsertRetryPolicy retryPolicy,
List<ValueInSingleWindow<TableRow>> failedInserts)
throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
this.executor = options.as(GcsOptions.class).getExecutorService();
}
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError("If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list.
// If there are failures, they will contain only the failed rows to be retried.
List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;
List<String> idsToPublish = insertIdList;
while (true) {
List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>();
List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
int strideIndex = 0;
// Upload in batches.
List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
int dataSize = 0;
List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
List<Integer> strideIndices = new ArrayList<>();
for (int i = 0; i < rowsToPublish.size(); ++i) {
TableRow row = rowsToPublish.get(i).getValue();
TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
if (idsToPublish != null) {
out.setInsertId(idsToPublish.get(i));
}
out.setJson(row.getUnknownKeys());
rows.add(out);
dataSize += row.toString().length();
if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
|| i == rowsToPublish.size() - 1) {
TableDataInsertAllRequest content = new TableDataInsertAllRequest();
content.setRows(rows);
final Bigquery.Tabledata.InsertAll insert = client.tabledata()
.insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
content);
futures.add(
executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
@Override
public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
// A backoff for rate limit exceeded errors. Retries forever.
BackOff backoff = BackOffAdapter.toGcpBackOff(
RATE_LIMIT_BACKOFF_FACTORY.backoff());
while (true) {
try {
return insert.execute().getInsertErrors();
} catch (IOException e) {
if (new ApiErrorExtractor().rateLimited(e)) {
LOG.info("BigQuery insertAll exceeded rate limit, retrying");
try {
sleeper.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException interrupted) {
throw new IOException(
"Interrupted while waiting before retrying insertAll");
}
} else {
throw e;
}
}
}
}
}));
strideIndices.add(strideIndex);
retTotalDataSize += dataSize;
dataSize = 0;
strideIndex = i + 1;
rows = new LinkedList<>();
}
}
try {
for (int i = 0; i < futures.size(); i++) {
List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
if (errors == null) {
continue;
}
for (TableDataInsertAllResponse.InsertErrors error : errors) {
if (error.getIndex() == null) {
throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
}
int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
allErrors.add(error);
retryRows.add(rowsToPublish.get(errorIndex));
if (retryIds != null) {
retryIds.add(idsToPublish.get(errorIndex));
}
} else {
failedInserts.add(rowsToPublish.get(errorIndex));
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while inserting " + rowsToPublish);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
if (allErrors.isEmpty()) {
break;
}
long nextBackoffMillis = backoff.nextBackOffMillis();
if (nextBackoffMillis == BackOff.STOP) {
break;
}
try {
sleeper.sleep(nextBackoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
"Interrupted while waiting before retrying insert of " + retryRows);
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
}
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
return retTotalDataSize;
}
}