long insertAll()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java [817:1007]


    <T> long insertAll(
        TableReference ref,
        List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowList,
        @Nullable List<String> insertIdList,
        BackOff backoff,
        FluentBackoff rateLimitBackoffFactory,
        final Sleeper sleeper,
        InsertRetryPolicy retryPolicy,
        List<ValueInSingleWindow<T>> failedInserts,
        ErrorContainer<T> errorContainer,
        boolean skipInvalidRows,
        boolean ignoreUnkownValues,
        boolean ignoreInsertIds)
        throws IOException, InterruptedException {
      checkNotNull(ref, "ref");
      if (executor == null) {
        this.executor =
            new BoundedExecutorService(
                options.as(GcsOptions.class).getExecutorService(),
                options.as(BigQueryOptions.class).getInsertBundleParallelism());
      }
      if (insertIdList != null && rowList.size() != insertIdList.size()) {
        throw new AssertionError(
            "If insertIdList is not null it needs to have at least "
                + "as many elements as rowList");
      }

      long retTotalDataSize = 0;
      List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
      // These lists contain the rows to publish. Initially the contain the entire list.
      // If there are failures, they will contain only the failed rows to be retried.
      List<FailsafeValueInSingleWindow<TableRow, TableRow>> rowsToPublish = rowList;
      List<String> idsToPublish = null;
      if (!ignoreInsertIds) {
        idsToPublish = insertIdList;
      }
      while (true) {
        List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
        List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : null;

        int strideIndex = 0;
        // Upload in batches.
        List<TableDataInsertAllRequest.Rows> rows = new ArrayList<>();
        long dataSize = 0L;

        List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
        List<Integer> strideIndices = new ArrayList<>();
        // Store the longest throttled time across all parallel threads
        final AtomicLong maxThrottlingMsec = new AtomicLong();

        for (int i = 0; i < rowsToPublish.size(); ++i) {
          TableRow row = rowsToPublish.get(i).getValue();
          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
          if (idsToPublish != null) {
            out.setInsertId(idsToPublish.get(i));
          }
          out.setJson(row.getUnknownKeys());
          rows.add(out);

          try {
            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
          } catch (Exception ex) {
            throw new RuntimeException("Failed to convert the row to JSON", ex);
          }

          if (dataSize >= maxRowBatchSize
              || rows.size() >= maxRowsPerBatch
              || i == rowsToPublish.size() - 1) {
            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
            content.setRows(rows);
            content.setSkipInvalidRows(skipInvalidRows);
            content.setIgnoreUnknownValues(ignoreUnkownValues);

            final Bigquery.Tabledata.InsertAll insert =
                client
                    .tabledata()
                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
                    .setPrettyPrint(false);

            futures.add(
                executor.submit(
                    () -> {
                      // A backoff for rate limit exceeded errors.
                      BackOff backoff1 =
                          BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
                      long totalBackoffMillis = 0L;
                      while (true) {
                        try {
                          return insert.execute().getInsertErrors();
                        } catch (IOException e) {
                          recordError(e);
                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
                          if (errorInfo == null) {
                            throw e;
                          }
                          /**
                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
                           * GoogleCloudDataproc/hadoop-connectors
                           */
                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
                            throw e;
                          }
                          LOG.info(
                              String.format(
                                  "BigQuery insertAll error, retrying: %s",
                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
                          try {
                            long nextBackOffMillis = backoff1.nextBackOffMillis();
                            if (nextBackOffMillis == BackOff.STOP) {
                              throw e;
                            }
                            sleeper.sleep(nextBackOffMillis);
                            totalBackoffMillis += nextBackOffMillis;
                            final long totalBackoffMillisSoFar = totalBackoffMillis;
                            maxThrottlingMsec.getAndUpdate(
                                current -> Math.max(current, totalBackoffMillisSoFar));
                          } catch (InterruptedException interrupted) {
                            throw new IOException(
                                "Interrupted while waiting before retrying insertAll");
                          }
                        }
                      }
                    }));
            strideIndices.add(strideIndex);

            retTotalDataSize += dataSize;

            dataSize = 0L;
            strideIndex = i + 1;
            rows = new ArrayList<>();
          }
        }

        try {
          for (int i = 0; i < futures.size(); i++) {
            List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
            if (errors == null) {
              continue;
            }
            for (TableDataInsertAllResponse.InsertErrors error : errors) {
              if (error.getIndex() == null) {
                throw new IOException("Insert failed: " + error + ", other errors: " + allErrors);
              }

              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
              if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) {
                allErrors.add(error);
                retryRows.add(rowsToPublish.get(errorIndex));
                if (retryIds != null) {
                  retryIds.add(idsToPublish.get(errorIndex));
                }
              } else {
                errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex));
              }
            }
          }
          // Accumulate the longest throttled time across all parallel threads
          throttlingMsecs.inc(maxThrottlingMsec.get());
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Interrupted while inserting " + rowsToPublish);
        } catch (ExecutionException e) {
          throw new RuntimeException(e.getCause());
        }

        if (allErrors.isEmpty()) {
          break;
        }
        long nextBackoffMillis = backoff.nextBackOffMillis();
        if (nextBackoffMillis == BackOff.STOP) {
          break;
        }
        try {
          sleeper.sleep(nextBackoffMillis);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
        }
        rowsToPublish = retryRows;
        idsToPublish = retryIds;
        allErrors.clear();
        LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
      }
      if (!allErrors.isEmpty()) {
        throw new IOException("Insert failed: " + allErrors);
      } else {
        return retTotalDataSize;
      }
    }