in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StateFetcher.java [95:183]
public <T, SideWindowT extends BoundedWindow> T fetchSideInput(final PCollectionView<T> view,
final SideWindowT sideWindow, final String stateFamily, SideInputState state,
final Supplier<StateSampler.ScopedState> scopedReadStateSupplier) {
final SideInputId id = new SideInputId(view.getTagInternal(), sideWindow);
Callable<SideInputCacheEntry> fetchCallable = new Callable<SideInputCacheEntry>() {
@Override
public SideInputCacheEntry call() throws Exception {
@SuppressWarnings("unchecked")
WindowingStrategy<?, SideWindowT> sideWindowStrategy =
(WindowingStrategy<?, SideWindowT>) view.getWindowingStrategyInternal();
Coder<SideWindowT> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();
ByteString.Output windowStream = ByteString.newOutput();
windowCoder.encode(sideWindow, windowStream, Coder.Context.OUTER);
@SuppressWarnings("unchecked")
Windmill.GlobalDataRequest request =
Windmill.GlobalDataRequest.newBuilder()
.setDataId(Windmill.GlobalDataId.newBuilder()
.setTag(view.getTagInternal().getId())
.setVersion(windowStream.toByteString())
.build())
.setStateFamily(stateFamily)
.setExistenceWatermarkDeadline(
TimeUnit.MILLISECONDS.toMicros(sideWindowStrategy
.getTrigger().getSpec()
.getWatermarkThatGuaranteesFiring(sideWindow)
.getMillis()))
.build();
Windmill.GetDataResponse response;
try (StateSampler.ScopedState scope = scopedReadStateSupplier.get()) {
response = server.getSideInputData(
Windmill.GetDataRequest.newBuilder()
.addGlobalDataFetchRequests(request)
.addGlobalDataToFetch(request.getDataId())
.build());
}
Windmill.GlobalData data = response.getGlobalData(0);
bytesRead += data.getSerializedSize();
Iterable<WindowedValue<?>> rawData;
if (data.getIsReady()) {
if (data.getData().size() > 0) {
rawData = view.getCoderInternal().decode(
data.getData().newInput(), Coder.Context.OUTER);
} else {
rawData = Collections.emptyList();
}
return new SideInputCacheEntry(
view.fromIterableInternal(rawData), data.getData().size());
} else {
return SideInputCacheEntry.notReady();
}
}
};
try {
if (state == SideInputState.KNOWN_READY) {
SideInputCacheEntry entry = sideInputCache.getIfPresent(id);
if (entry == null) {
return (T) sideInputCache.get(id, fetchCallable).value;
} else if (!entry.isReady()) {
// Invalidate the existing not-ready entry. This must be done atomically
// so that another thread doesn't replace the entry with a ready entry, which
// would then be deleted here.
synchronized (entry) {
SideInputCacheEntry newEntry = sideInputCache.getIfPresent(id);
if (newEntry != null && !newEntry.isReady()) {
sideInputCache.invalidate(id);
}
}
return (T) sideInputCache.get(id, fetchCallable).value;
} else {
return (T) entry.value;
}
} else {
return (T) sideInputCache.get(id, fetchCallable).value;
}
} catch (Exception e) {
LOG.error("Fetch failed: ", e);
throw new RuntimeException("Exception while fetching side input: ", e);
}
}