public void insertAll()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java [157:269]


  public void insertAll(TableReference ref, List<TableRow> rowList,
      @Nullable List<String> insertIdList) throws IOException {
    Preconditions.checkNotNull(ref, "ref");
    if (insertIdList != null && rowList.size() != insertIdList.size()) {
      throw new AssertionError("If insertIdList is not null it needs to have at least "
          + "as many elements as rowList");
    }

    AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
        MAX_INSERT_ATTEMPTS,
        INITIAL_INSERT_BACKOFF_INTERVAL_MS);

    final List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
    // These lists contain the rows to publish. Initially the contain the entire list. If there are
    // failures, they will contain only the failed rows to be retried.
    List<TableRow> rowsToPublish = rowList;
    List<String> idsToPublish = insertIdList;
    while (true) {
      final List<TableRow> retryRows = new ArrayList<>();
      final List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;

      int strideIndex = 0;
      // Upload in batches.
      List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
      int dataSize = 0;

      List<Future<?>> futures = new ArrayList<>();

      for (int i = 0; i < rowsToPublish.size(); ++i) {
        TableRow row = rowsToPublish.get(i);
        TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
        if (idsToPublish != null) {
          out.setInsertId(idsToPublish.get(i));
        }
        out.setJson(row.getUnknownKeys());
        rows.add(out);

        dataSize += row.toString().length();
        if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch ||
            i == rowsToPublish.size() - 1) {
          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
          content.setRows(rows);

          final Bigquery.Tabledata.InsertAll insert = client.tabledata()
              .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
                  content);

          final int finalStrideIndex = strideIndex;
          final List<TableRow> finalRowsToPublish = rowsToPublish;
          final List<String> finalIdsToPublish = idsToPublish;

          futures.add(executor.submit(new Runnable() {
              @Override
              public void run() {
                try {
                  TableDataInsertAllResponse response = insert.execute();

                  List<TableDataInsertAllResponse.InsertErrors> errors = response.getInsertErrors();
                  if (errors != null) {
                    synchronized (this) {
                      allErrors.addAll(errors);
                      for (TableDataInsertAllResponse.InsertErrors error : errors) {
                        if (error.getIndex() == null) {
                          throw new IOException("Insert failed: " + allErrors);
                        }

                        int errorIndex = error.getIndex().intValue() + finalStrideIndex;
                        retryRows.add(finalRowsToPublish.get(errorIndex));
                        if (retryIds != null) {
                          retryIds.add(finalIdsToPublish.get(errorIndex));
                        }
                      }
                    }
                  }
                } catch (IOException e) {
                  throw new RuntimeException(e);
                }
              }
            }));

          dataSize = 0;
          strideIndex = i + 1;
          rows = new LinkedList<>();
        }
      }

      try {
        for (Future<?> future : futures) {
          future.get();
        }
      } catch (InterruptedException e) {
      } catch (ExecutionException e) {
        Throwables.propagate(e.getCause());
      }

      if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
        try {
          Thread.sleep(backoff.nextBackOffMillis());
        } catch (InterruptedException e) {
          // ignore.
        }
        LOG.info("Retrying failed inserts to BigQuery");
        rowsToPublish = retryRows;
        idsToPublish = retryIds;
        allErrors.clear();
      } else {
        break;
      }
    }
    if (!allErrors.isEmpty()) {
      throw new IOException("Insert failed: " + allErrors);
    }
  }