private Set getReadyWindows()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java [128:176]


  private Set<W> getReadyWindows() {
    Set<W> readyWindows = new HashSet<>();

    for (Windmill.GlobalDataId id : execContext.getSideInputNotifications()) {
      if (sideInputViews.get(id.getTag()) == null) {
        // Side input is for a different DoFn; ignore it.
        continue;
      }

      for (Map.Entry<W, Set<Windmill.GlobalDataRequest>> entry : blockedMap.entrySet()) {
        Set<Windmill.GlobalDataRequest> windowBlockedSet = entry.getValue();
        Set<Windmill.GlobalDataRequest> found = new HashSet<>();
        for (Windmill.GlobalDataRequest request : windowBlockedSet) {
          if (id.equals(request.getDataId())) {
            found.add(request);
          }
        }

        windowBlockedSet.removeAll(found);

        if (windowBlockedSet.isEmpty()) {
          // Notifications were received for all side inputs for this window.
          // Issue fetches for all the needed side inputs to make sure they are all present
          // in the local cache.  If not, note the side inputs as still being blocked.
          try {
            W window = entry.getKey();
            boolean allSideInputsCached = true;
            for (PCollectionView<?> view : sideInputViews.values()) {
              if (!stepContext.issueSideInputFetch(
                  view, window, SideInputState.KNOWN_READY)) {
                Windmill.GlobalDataRequest request = buildGlobalDataRequest(view, window);
                stepContext.addBlockingSideInput(request);
                windowBlockedSet.add(request);
                allSideInputsCached = false;
              }
            }

            if (allSideInputsCached) {
              readyWindows.add(window);
            }
          } catch (IOException e) {
            throw Throwables.propagate(e);
          }
        }
      }
    }

    return readyWindows;
  }