public T fetchSideInput()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StateFetcher.java [95:183]


  public <T, SideWindowT extends BoundedWindow> T fetchSideInput(final PCollectionView<T> view,
      final SideWindowT sideWindow, final String stateFamily, SideInputState state,
      final Supplier<StateSampler.ScopedState> scopedReadStateSupplier) {
    final SideInputId id = new SideInputId(view.getTagInternal(), sideWindow);

    Callable<SideInputCacheEntry> fetchCallable = new Callable<SideInputCacheEntry>() {
      @Override
      public SideInputCacheEntry call() throws Exception {
        @SuppressWarnings("unchecked")
        WindowingStrategy<?, SideWindowT> sideWindowStrategy =
            (WindowingStrategy<?, SideWindowT>) view.getWindowingStrategyInternal();

        Coder<SideWindowT> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();

        ByteString.Output windowStream = ByteString.newOutput();
        windowCoder.encode(sideWindow, windowStream, Coder.Context.OUTER);

        @SuppressWarnings("unchecked")
        Windmill.GlobalDataRequest request =
            Windmill.GlobalDataRequest.newBuilder()
                .setDataId(Windmill.GlobalDataId.newBuilder()
                    .setTag(view.getTagInternal().getId())
                    .setVersion(windowStream.toByteString())
                    .build())
                .setStateFamily(stateFamily)
                .setExistenceWatermarkDeadline(
                     TimeUnit.MILLISECONDS.toMicros(sideWindowStrategy
                         .getTrigger().getSpec()
                         .getWatermarkThatGuaranteesFiring(sideWindow)
                         .getMillis()))
                .build();

        Windmill.GetDataResponse response;
        try (StateSampler.ScopedState scope = scopedReadStateSupplier.get()) {
            response = server.getSideInputData(
                Windmill.GetDataRequest.newBuilder()
                .addGlobalDataFetchRequests(request)
                .addGlobalDataToFetch(request.getDataId())
                .build());
        }

        Windmill.GlobalData data = response.getGlobalData(0);
        bytesRead += data.getSerializedSize();

        Iterable<WindowedValue<?>> rawData;
        if (data.getIsReady()) {
          if (data.getData().size() > 0) {
            rawData = view.getCoderInternal().decode(
                data.getData().newInput(), Coder.Context.OUTER);
          } else {
            rawData = Collections.emptyList();
          }

          return new SideInputCacheEntry(
              view.fromIterableInternal(rawData), data.getData().size());
        } else {
          return SideInputCacheEntry.notReady();
        }
      }
    };

    try {
      if (state == SideInputState.KNOWN_READY) {
        SideInputCacheEntry entry = sideInputCache.getIfPresent(id);
        if (entry == null) {
          return (T) sideInputCache.get(id, fetchCallable).value;
        } else if (!entry.isReady()) {
          // Invalidate the existing not-ready entry.  This must be done atomically
          // so that another thread doesn't replace the entry with a ready entry, which
          // would then be deleted here.
          synchronized (entry) {
            SideInputCacheEntry newEntry = sideInputCache.getIfPresent(id);
            if (newEntry != null && !newEntry.isReady()) {
              sideInputCache.invalidate(id);
            }
          }

          return (T) sideInputCache.get(id, fetchCallable).value;
        } else {
          return (T) entry.value;
        }
      } else {
        return (T) sideInputCache.get(id, fetchCallable).value;
      }
    } catch (Exception e) {
      LOG.error("Fetch failed: ", e);
      throw new RuntimeException("Exception while fetching side input: ", e);
    }
  }