public void process()

in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java [301:506]


    public void process(
        ProcessContext c,
        final PipelineOptions pipelineOptions,
        @Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
        final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
        final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
        final OutputReceiver<KV<String, Operation>> o)
        throws Exception {
      dynamicDestinations.setSideInputAccessorFromProcessContext(c);
      TableDestination tableDestination =
          destinations.computeIfAbsent(
              element.getKey().getKey(),
              dest -> {
                TableDestination tableDestination1 = dynamicDestinations.getTable(dest);
                checkArgument(
                    tableDestination1 != null,
                    "DynamicDestinations.getTable() may not return null, "
                        + "but %s returned null for destination %s",
                    dynamicDestinations,
                    dest);
                Supplier<TableSchema> schemaSupplier = () -> dynamicDestinations.getSchema(dest);
                return CreateTableHelpers.possiblyCreateTable(
                    c,
                    tableDestination1,
                    schemaSupplier,
                    createDisposition,
                    destinationCoder,
                    kmsKey,
                    bqServices);
              });
      final String tableId = tableDestination.getTableUrn();
      final DatasetService datasetService = getDatasetService(pipelineOptions);
      MessageConverter<ElementT> messageConverter =
          messageConverters.get(element.getKey().getKey(), dynamicDestinations);
      Descriptor descriptor = messageConverter.getSchemaDescriptor();

      // Each ProtoRows object contains at most 1MB of rows.
      // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
      // already proto or
      // already schema.
      final long oneMb = 1024 * 1024;
      Iterable<ProtoRows> messages =
          new SplittingIterable<>(
              Iterables.transform(element.getValue(), e -> messageConverter.toMessage(e)), oneMb);

      class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
        final ShardedKey<DestinationT> key;
        String streamName = "";
        StreamAppendClient client = null;
        long offset = -1;
        long numRows = 0;
        long tryIteration = 0;

        AppendRowsContext(ShardedKey<DestinationT> key) {
          this.key = key;
        }

        @Override
        public String toString() {
          return "Context: key="
              + key
              + " streamName="
              + streamName
              + " offset="
              + offset
              + " numRows="
              + numRows
              + " tryIteration: "
              + tryIteration;
        }
      };

      // Initialize stream names and offsets for all contexts. This will be called initially, but
      // will also be called
      // if we roll over to a new stream on a retry.
      BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
          (contexts, isFailure) -> {
            try {
              if (isFailure) {
                // Clear the stream name, forcing a new one to be created.
                streamName.write("");
              }
              String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
              StreamAppendClient appendClient =
                  APPEND_CLIENTS.get(
                      stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
              for (AppendRowsContext context : contexts) {
                context.streamName = stream;
                appendClient.pin();
                context.client = appendClient;
                context.offset = streamOffset.read();
                ++context.tryIteration;
                streamOffset.write(context.offset + context.numRows);
              }
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          };

      Consumer<Iterable<AppendRowsContext>> clearClients =
          contexts -> {
            APPEND_CLIENTS.invalidate(streamName.read());
            for (AppendRowsContext context : contexts) {
              if (context.client != null) {
                // Unpin in a different thread, as it may execute a blocking close.
                runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
                context.client = null;
              }
            }
          };

      Instant now = Instant.now();
      List<AppendRowsContext> contexts = Lists.newArrayList();
      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
          new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 1000);
      int numSplits = 0;
      for (ProtoRows protoRows : messages) {
        ++numSplits;
        Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
            context -> {
              try {
                StreamAppendClient appendClient =
                    APPEND_CLIENTS.get(
                        context.streamName,
                        () -> datasetService.getStreamAppendClient(context.streamName, descriptor));
                return appendClient.appendRows(context.offset, protoRows);
              } catch (Exception e) {
                throw new RuntimeException(e);
              }
            };

        // RetryManager
        Function<Iterable<AppendRowsContext>, RetryType> onError =
            failedContexts -> {
              // The first context is always the one that fails.
              AppendRowsContext failedContext =
                  Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
              Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
              // Invalidate the StreamWriter and force a new one to be created.
              LOG.error(
                  "Got error " + failedContext.getError() + " closing " + failedContext.streamName);
              clearClients.accept(contexts);
              appendFailures.inc();
              if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) {
                appendOffsetFailures.inc();
                LOG.warn(
                    "Append to "
                        + failedContext
                        + " failed with "
                        + failedContext.getError()
                        + " Will retry with a new stream");
                // This means that the offset we have stored does not match the current end of
                // the stream in the Storage API. Usually this happens because a crash or a bundle
                // failure
                // happened after an append but before the worker could checkpoint it's
                // state. The records that were appended in a failed bundle will be retried,
                // meaning that the unflushed tail of the stream must be discarded to prevent
                // duplicates.

                // Finalize the stream and clear streamName so a new stream will be created.
                o.output(
                    KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true)));
                // Reinitialize all contexts with the new stream and new offsets.
                initializeContexts.accept(failedContexts, true);

                // Offset failures imply that all subsequent parallel appends will also fail.
                // Retry them all.
                return RetryType.RETRY_ALL_OPERATIONS;
              }

              return RetryType.RETRY_ALL_OPERATIONS;
            };

        Consumer<AppendRowsContext> onSuccess =
            context -> {
              o.output(
                  KV.of(
                      context.streamName,
                      new Operation(context.offset + context.numRows - 1, false)));
              flushesScheduled.inc(protoRows.getSerializedRowsCount());
            };

        AppendRowsContext context = new AppendRowsContext(element.getKey());
        context.numRows = protoRows.getSerializedRowsCount();
        contexts.add(context);
        retryManager.addOperation(run, onError, onSuccess, context);
        recordsAppended.inc(protoRows.getSerializedRowsCount());
        appendSizeDistribution.update(context.numRows);
      }
      initializeContexts.accept(contexts, false);

      try {
        retryManager.run(true);
      } finally {
        // Make sure that all pins are removed.
        for (AppendRowsContext context : contexts) {
          if (context.client != null) {
            runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
          }
        }
      }
      appendSplitDistribution.update(numSplits);

      java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
      appendLatencyDistribution.update(timeElapsed.toMillis());
    }