in scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncBatchLookupDoFn.java [279:343]
private FutureType handleOutput(FutureType future, List<Input> batchInput, UUID key) {
final Map<String, Input> keyedInputs =
batchInput.stream().collect(Collectors.toMap(idExtractorFn::apply, identity()));
return addCallback(
future,
response -> {
final Map<String, Output> keyedOutput =
batchResponseFn.apply(response).stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
keyedInputs.forEach(
(id, input) -> {
final List<ValueInSingleWindow<Input>> processInputs = inputs.remove(id);
if (processInputs == null) {
// no need to fail future here as we're only interested in its completion
// finishBundle will fail the checkState as we do not produce any result
LOG.error(
"The ID '{}' received in the gRPC batch response does not "
+ "match any IDs extracted via the idExtractorFn for the requested "
+ "batch sent to the gRPC endpoint. Please ensure that the IDs returned "
+ "from the gRPC endpoints match the IDs extracted using the provided"
+ "idExtractorFn for the same input.",
id);
} else {
List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
processInputs.stream()
.map(
processInput -> {
final Input i = processInput.getValue();
final Output output = keyedOutput.get(id);
final TryWrapper o =
output == null
? failure(new UnmatchedRequestException(id))
: success(output);
final Instant ts = processInput.getTimestamp();
final BoundedWindow w = processInput.getWindow();
final PaneInfo p = processInput.getPane();
return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
})
.collect(Collectors.toList());
results.add(Pair.of(key, batchResult));
}
});
return null;
},
throwable -> {
keyedInputs.forEach(
(id, element) -> {
final List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
inputs.remove(id).stream()
.map(
processInput -> {
final Input i = processInput.getValue();
final TryWrapper o = failure(throwable);
final Instant ts = processInput.getTimestamp();
final BoundedWindow w = processInput.getWindow();
final PaneInfo p = processInput.getPane();
return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
})
.collect(Collectors.toList());
results.add(Pair.of(key, batchResult));
});
return null;
});
}