in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java [301:506]
public void process(
ProcessContext c,
final PipelineOptions pipelineOptions,
@Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
final OutputReceiver<KV<String, Operation>> o)
throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
TableDestination tableDestination =
destinations.computeIfAbsent(
element.getKey().getKey(),
dest -> {
TableDestination tableDestination1 = dynamicDestinations.getTable(dest);
checkArgument(
tableDestination1 != null,
"DynamicDestinations.getTable() may not return null, "
+ "but %s returned null for destination %s",
dynamicDestinations,
dest);
Supplier<TableSchema> schemaSupplier = () -> dynamicDestinations.getSchema(dest);
return CreateTableHelpers.possiblyCreateTable(
c,
tableDestination1,
schemaSupplier,
createDisposition,
destinationCoder,
kmsKey,
bqServices);
});
final String tableId = tableDestination.getTableUrn();
final DatasetService datasetService = getDatasetService(pipelineOptions);
MessageConverter<ElementT> messageConverter =
messageConverters.get(element.getKey().getKey(), dynamicDestinations);
Descriptor descriptor = messageConverter.getSchemaDescriptor();
// Each ProtoRows object contains at most 1MB of rows.
// TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
// already proto or
// already schema.
final long oneMb = 1024 * 1024;
Iterable<ProtoRows> messages =
new SplittingIterable<>(
Iterables.transform(element.getValue(), e -> messageConverter.toMessage(e)), oneMb);
class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
final ShardedKey<DestinationT> key;
String streamName = "";
StreamAppendClient client = null;
long offset = -1;
long numRows = 0;
long tryIteration = 0;
AppendRowsContext(ShardedKey<DestinationT> key) {
this.key = key;
}
@Override
public String toString() {
return "Context: key="
+ key
+ " streamName="
+ streamName
+ " offset="
+ offset
+ " numRows="
+ numRows
+ " tryIteration: "
+ tryIteration;
}
};
// Initialize stream names and offsets for all contexts. This will be called initially, but
// will also be called
// if we roll over to a new stream on a retry.
BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
(contexts, isFailure) -> {
try {
if (isFailure) {
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
for (AppendRowsContext context : contexts) {
context.streamName = stream;
appendClient.pin();
context.client = appendClient;
context.offset = streamOffset.read();
++context.tryIteration;
streamOffset.write(context.offset + context.numRows);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
};
Consumer<Iterable<AppendRowsContext>> clearClients =
contexts -> {
APPEND_CLIENTS.invalidate(streamName.read());
for (AppendRowsContext context : contexts) {
if (context.client != null) {
// Unpin in a different thread, as it may execute a blocking close.
runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
context.client = null;
}
}
};
Instant now = Instant.now();
List<AppendRowsContext> contexts = Lists.newArrayList();
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 1000);
int numSplits = 0;
for (ProtoRows protoRows : messages) {
++numSplits;
Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
context -> {
try {
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
context.streamName,
() -> datasetService.getStreamAppendClient(context.streamName, descriptor));
return appendClient.appendRows(context.offset, protoRows);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
// RetryManager
Function<Iterable<AppendRowsContext>, RetryType> onError =
failedContexts -> {
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// Invalidate the StreamWriter and force a new one to be created.
LOG.error(
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
clearClients.accept(contexts);
appendFailures.inc();
if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
+ failedContext
+ " failed with "
+ failedContext.getError()
+ " Will retry with a new stream");
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.
// Finalize the stream and clear streamName so a new stream will be created.
o.output(
KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true)));
// Reinitialize all contexts with the new stream and new offsets.
initializeContexts.accept(failedContexts, true);
// Offset failures imply that all subsequent parallel appends will also fail.
// Retry them all.
return RetryType.RETRY_ALL_OPERATIONS;
}
return RetryType.RETRY_ALL_OPERATIONS;
};
Consumer<AppendRowsContext> onSuccess =
context -> {
o.output(
KV.of(
context.streamName,
new Operation(context.offset + context.numRows - 1, false)));
flushesScheduled.inc(protoRows.getSerializedRowsCount());
};
AppendRowsContext context = new AppendRowsContext(element.getKey());
context.numRows = protoRows.getSerializedRowsCount();
contexts.add(context);
retryManager.addOperation(run, onError, onSuccess, context);
recordsAppended.inc(protoRows.getSerializedRowsCount());
appendSizeDistribution.update(context.numRows);
}
initializeContexts.accept(contexts, false);
try {
retryManager.run(true);
} finally {
// Make sure that all pins are removed.
for (AppendRowsContext context : contexts) {
if (context.client != null) {
runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
}
}
}
appendSplitDistribution.update(numSplits);
java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
}