private FutureType handleOutput()

in scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncBatchLookupDoFn.java [279:343]


  private FutureType handleOutput(FutureType future, List<Input> batchInput, UUID key) {
    final Map<String, Input> keyedInputs =
        batchInput.stream().collect(Collectors.toMap(idExtractorFn::apply, identity()));
    return addCallback(
        future,
        response -> {
          final Map<String, Output> keyedOutput =
              batchResponseFn.apply(response).stream()
                  .collect(Collectors.toMap(Pair::getKey, Pair::getValue));

          keyedInputs.forEach(
              (id, input) -> {
                final List<ValueInSingleWindow<Input>> processInputs = inputs.remove(id);
                if (processInputs == null) {
                  // no need to fail future here as we're only interested in its completion
                  // finishBundle will fail the checkState as we do not produce any result
                  LOG.error(
                      "The ID '{}' received in the gRPC batch response does not "
                          + "match any IDs extracted via the idExtractorFn for the requested  "
                          + "batch sent to the gRPC endpoint. Please ensure that the IDs returned "
                          + "from the gRPC endpoints match the IDs extracted using the provided"
                          + "idExtractorFn for the same input.",
                      id);
                } else {
                  List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
                      processInputs.stream()
                          .map(
                              processInput -> {
                                final Input i = processInput.getValue();
                                final Output output = keyedOutput.get(id);
                                final TryWrapper o =
                                    output == null
                                        ? failure(new UnmatchedRequestException(id))
                                        : success(output);
                                final Instant ts = processInput.getTimestamp();
                                final BoundedWindow w = processInput.getWindow();
                                final PaneInfo p = processInput.getPane();
                                return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
                              })
                          .collect(Collectors.toList());
                  results.add(Pair.of(key, batchResult));
                }
              });
          return null;
        },
        throwable -> {
          keyedInputs.forEach(
              (id, element) -> {
                final List<ValueInSingleWindow<KV<Input, TryWrapper>>> batchResult =
                    inputs.remove(id).stream()
                        .map(
                            processInput -> {
                              final Input i = processInput.getValue();
                              final TryWrapper o = failure(throwable);
                              final Instant ts = processInput.getTimestamp();
                              final BoundedWindow w = processInput.getWindow();
                              final PaneInfo p = processInput.getPane();
                              return ValueInSingleWindow.of(KV.of(i, o), ts, w, p);
                            })
                        .collect(Collectors.toList());
                results.add(Pair.of(key, batchResult));
              });
          return null;
        });
  }