in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java [157:269]
public void insertAll(TableReference ref, List<TableRow> rowList,
@Nullable List<String> insertIdList) throws IOException {
Preconditions.checkNotNull(ref, "ref");
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError("If insertIdList is not null it needs to have at least "
+ "as many elements as rowList");
}
AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
MAX_INSERT_ATTEMPTS,
INITIAL_INSERT_BACKOFF_INTERVAL_MS);
final List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list. If there are
// failures, they will contain only the failed rows to be retried.
List<TableRow> rowsToPublish = rowList;
List<String> idsToPublish = insertIdList;
while (true) {
final List<TableRow> retryRows = new ArrayList<>();
final List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
int strideIndex = 0;
// Upload in batches.
List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
int dataSize = 0;
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < rowsToPublish.size(); ++i) {
TableRow row = rowsToPublish.get(i);
TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
if (idsToPublish != null) {
out.setInsertId(idsToPublish.get(i));
}
out.setJson(row.getUnknownKeys());
rows.add(out);
dataSize += row.toString().length();
if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch ||
i == rowsToPublish.size() - 1) {
TableDataInsertAllRequest content = new TableDataInsertAllRequest();
content.setRows(rows);
final Bigquery.Tabledata.InsertAll insert = client.tabledata()
.insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
content);
final int finalStrideIndex = strideIndex;
final List<TableRow> finalRowsToPublish = rowsToPublish;
final List<String> finalIdsToPublish = idsToPublish;
futures.add(executor.submit(new Runnable() {
@Override
public void run() {
try {
TableDataInsertAllResponse response = insert.execute();
List<TableDataInsertAllResponse.InsertErrors> errors = response.getInsertErrors();
if (errors != null) {
synchronized (this) {
allErrors.addAll(errors);
for (TableDataInsertAllResponse.InsertErrors error : errors) {
if (error.getIndex() == null) {
throw new IOException("Insert failed: " + allErrors);
}
int errorIndex = error.getIndex().intValue() + finalStrideIndex;
retryRows.add(finalRowsToPublish.get(errorIndex));
if (retryIds != null) {
retryIds.add(finalIdsToPublish.get(errorIndex));
}
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}));
dataSize = 0;
strideIndex = i + 1;
rows = new LinkedList<>();
}
}
try {
for (Future<?> future : futures) {
future.get();
}
} catch (InterruptedException e) {
} catch (ExecutionException e) {
Throwables.propagate(e.getCause());
}
if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
try {
Thread.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException e) {
// ignore.
}
LOG.info("Retrying failed inserts to BigQuery");
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
} else {
break;
}
}
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
}
}