long insertAll()

in ratatool-sampling/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchedBigQueryServicesImpl.java [661:802]


    long insertAll(TableReference ref, List<ValueInSingleWindow<TableRow>> rowList,
                   @Nullable List<String> insertIdList,
                   BackOff backoff, final Sleeper sleeper, InsertRetryPolicy retryPolicy,
                   List<ValueInSingleWindow<TableRow>> failedInserts)
        throws IOException, InterruptedException {
      checkNotNull(ref, "ref");
      if (executor == null) {
        this.executor = options.as(GcsOptions.class).getExecutorService();
      }
      if (insertIdList != null && rowList.size() != insertIdList.size()) {
        throw new AssertionError("If insertIdList is not null it needs to have at least "
                                 + "as many elements as rowList");
      }

      long retTotalDataSize = 0;
      List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
      // These lists contain the rows to publish. Initially the contain the entire list.
      // If there are failures, they will contain only the failed rows to be retried.
      List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList;
      List<String> idsToPublish = insertIdList;
      while (true) {
        List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>();
        List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;

        int strideIndex = 0;
        // Upload in batches.
        List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
        int dataSize = 0;

        List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
        List<Integer> strideIndices = new ArrayList<>();

        for (int i = 0; i < rowsToPublish.size(); ++i) {
          TableRow row = rowsToPublish.get(i).getValue();
          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
          if (idsToPublish != null) {
            out.setInsertId(idsToPublish.get(i));
          }
          out.setJson(row.getUnknownKeys());
          rows.add(out);

          dataSize += row.toString().length();
          if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
              || i == rowsToPublish.size() - 1) {
            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
            content.setRows(rows);

            final Bigquery.Tabledata.InsertAll insert = client.tabledata()
                .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
                    content);

            futures.add(
                executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
                  @Override
                  public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
                    // A backoff for rate limit exceeded errors. Retries forever.
                    BackOff backoff = BackOffAdapter.toGcpBackOff(
                        RATE_LIMIT_BACKOFF_FACTORY.backoff());
                    while (true) {
                      try {
                        return insert.execute().getInsertErrors();
                      } catch (IOException e) {
                        if (new ApiErrorExtractor().rateLimited(e)) {
                          LOG.info("BigQuery insertAll exceeded rate limit, retrying");
                          try {
                            sleeper.sleep(backoff.nextBackOffMillis());
                          } catch (InterruptedException interrupted) {
                            throw new IOException(
                                "Interrupted while waiting before retrying insertAll");
                          }
                        } else {
                          throw e;
                        }
                      }
                    }
                  }
                }));
            strideIndices.add(strideIndex);

            retTotalDataSize += dataSize;

            dataSize = 0;
            strideIndex = i + 1;
            rows = new LinkedList<>();
          }
        }

        try {
          for (int i = 0; i < futures.size(); i++) {
            List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
            if (errors == null) {
              continue;
            }
            for (TableDataInsertAllResponse.InsertErrors error : errors) {
              if (error.getIndex() == null) {
                throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
              }

              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
              if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
                allErrors.add(error);
                retryRows.add(rowsToPublish.get(errorIndex));
                if (retryIds != null) {
                  retryIds.add(idsToPublish.get(errorIndex));
                }
              } else {
                failedInserts.add(rowsToPublish.get(errorIndex));
              }
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Interrupted while inserting " + rowsToPublish);
        } catch (ExecutionException e) {
          throw new RuntimeException(e.getCause());
        }

        if (allErrors.isEmpty()) {
          break;
        }
        long nextBackoffMillis = backoff.nextBackOffMillis();
        if (nextBackoffMillis == BackOff.STOP) {
          break;
        }
        try {
          sleeper.sleep(nextBackoffMillis);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException(
              "Interrupted while waiting before retrying insert of " + retryRows);
        }
        rowsToPublish = retryRows;
        idsToPublish = retryIds;
        allErrors.clear();
        LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
      }
      if (!allErrors.isEmpty()) {
        throw new IOException("Insert failed: " + allErrors);
      } else {
        return retTotalDataSize;
      }
    }