in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java [128:176]
private Set<W> getReadyWindows() {
Set<W> readyWindows = new HashSet<>();
for (Windmill.GlobalDataId id : execContext.getSideInputNotifications()) {
if (sideInputViews.get(id.getTag()) == null) {
// Side input is for a different DoFn; ignore it.
continue;
}
for (Map.Entry<W, Set<Windmill.GlobalDataRequest>> entry : blockedMap.entrySet()) {
Set<Windmill.GlobalDataRequest> windowBlockedSet = entry.getValue();
Set<Windmill.GlobalDataRequest> found = new HashSet<>();
for (Windmill.GlobalDataRequest request : windowBlockedSet) {
if (id.equals(request.getDataId())) {
found.add(request);
}
}
windowBlockedSet.removeAll(found);
if (windowBlockedSet.isEmpty()) {
// Notifications were received for all side inputs for this window.
// Issue fetches for all the needed side inputs to make sure they are all present
// in the local cache. If not, note the side inputs as still being blocked.
try {
W window = entry.getKey();
boolean allSideInputsCached = true;
for (PCollectionView<?> view : sideInputViews.values()) {
if (!stepContext.issueSideInputFetch(
view, window, SideInputState.KNOWN_READY)) {
Windmill.GlobalDataRequest request = buildGlobalDataRequest(view, window);
stepContext.addBlockingSideInput(request);
windowBlockedSet.add(request);
allSideInputsCached = false;
}
}
if (allSideInputsCached) {
readyWindows.add(window);
}
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
}
return readyWindows;
}