in sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java [489:844]
private static DoFnSignature parseSignature(Class<? extends DoFn<?, ?>> fnClass) {
DoFnSignature.Builder signatureBuilder = DoFnSignature.builder();
ErrorReporter errors = new ErrorReporter(null, fnClass.getName());
errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
signatureBuilder.setFnClass(fnClass);
TypeDescriptor<? extends DoFn<?, ?>> fnT = TypeDescriptor.of(fnClass);
// Extract the input and output type, and whether the fn is bounded.
TypeDescriptor<?> inputT = null;
TypeDescriptor<?> outputT = null;
for (TypeDescriptor<?> supertype : fnT.getTypes()) {
if (!supertype.getRawType().equals(DoFn.class)) {
continue;
}
Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
inputT = TypeDescriptor.of(args[0]);
outputT = TypeDescriptor.of(args[1]);
}
errors.checkNotNull(inputT, "Unable to determine input type");
// Find the state and timer declarations in advance of validating
// method parameter lists
FnAnalysisContext fnContext = FnAnalysisContext.create();
fnContext.addStateDeclarations(analyzeStateDeclarations(errors, fnClass).values());
fnContext.addTimerDeclarations(analyzeTimerDeclarations(errors, fnClass).values());
fnContext.addTimerFamilyDeclarations(analyzeTimerFamilyDeclarations(errors, fnClass).values());
fnContext.addFieldAccessDeclarations(analyzeFieldAccessDeclaration(errors, fnClass).values());
Method processElementMethod =
findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true);
Method startBundleMethod = findAnnotatedMethod(errors, DoFn.StartBundle.class, fnClass, false);
Method finishBundleMethod =
findAnnotatedMethod(errors, DoFn.FinishBundle.class, fnClass, false);
Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false);
Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false);
Method onWindowExpirationMethod =
findAnnotatedMethod(errors, DoFn.OnWindowExpiration.class, fnClass, false);
Method getInitialRestrictionMethod =
findAnnotatedMethod(errors, DoFn.GetInitialRestriction.class, fnClass, false);
Method splitRestrictionMethod =
findAnnotatedMethod(errors, DoFn.SplitRestriction.class, fnClass, false);
Method truncateRestrictionMethod =
findAnnotatedMethod(errors, TruncateRestriction.class, fnClass, false);
Method getRestrictionCoderMethod =
findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, false);
Method newTrackerMethod = findAnnotatedMethod(errors, DoFn.NewTracker.class, fnClass, false);
Method getSizeMethod = findAnnotatedMethod(errors, DoFn.GetSize.class, fnClass, false);
Method getWatermarkEstimatorStateCoderMethod =
findAnnotatedMethod(errors, DoFn.GetWatermarkEstimatorStateCoder.class, fnClass, false);
Method getInitialWatermarkEstimatorStateMethod =
findAnnotatedMethod(errors, DoFn.GetInitialWatermarkEstimatorState.class, fnClass, false);
Method newWatermarkEstimatorMethod =
findAnnotatedMethod(errors, DoFn.NewWatermarkEstimator.class, fnClass, false);
Collection<Method> onTimerMethods =
ReflectHelpers.declaredMethodsWithAnnotation(DoFn.OnTimer.class, fnClass, DoFn.class);
HashMap<String, DoFnSignature.OnTimerMethod> onTimerMethodMap =
Maps.newHashMapWithExpectedSize(onTimerMethods.size());
for (Method onTimerMethod : onTimerMethods) {
String id = TimerDeclaration.PREFIX + onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
errors.checkArgument(
fnContext.getTimerDeclarations().containsKey(id),
"Callback %s is for undeclared timer %s",
onTimerMethod,
id);
TimerDeclaration timerDecl = fnContext.getTimerDeclarations().get(id);
errors.checkArgument(
timerDecl.field().getDeclaringClass().equals(getDeclaringClass(onTimerMethod)),
"Callback %s is for timer %s declared in a different class %s."
+ " Timer callbacks must be declared in the same lexical scope as their timer",
onTimerMethod,
id,
timerDecl.field().getDeclaringClass().getCanonicalName());
onTimerMethodMap.put(
id, analyzeOnTimerMethod(errors, fnT, onTimerMethod, id, inputT, outputT, fnContext));
}
signatureBuilder.setOnTimerMethods(onTimerMethodMap);
// Check for TimerFamily
Collection<Method> onTimerFamilyMethods =
ReflectHelpers.declaredMethodsWithAnnotation(DoFn.OnTimerFamily.class, fnClass, DoFn.class);
HashMap<String, DoFnSignature.OnTimerFamilyMethod> onTimerFamilyMethodMap =
Maps.newHashMapWithExpectedSize(onTimerFamilyMethods.size());
for (Method onTimerFamilyMethod : onTimerFamilyMethods) {
String id =
TimerFamilyDeclaration.PREFIX
+ onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value();
errors.checkArgument(
fnContext.getTimerFamilyDeclarations().containsKey(id),
"Callback %s is for undeclared timerFamily %s",
onTimerFamilyMethod,
id);
TimerFamilyDeclaration timerDecl = fnContext.getTimerFamilyDeclarations().get(id);
errors.checkArgument(
timerDecl.field().getDeclaringClass().equals(getDeclaringClass(onTimerFamilyMethod)),
"Callback %s is for timerFamily %s declared in a different class %s."
+ " TimerFamily callbacks must be declared in the same lexical scope as their timer",
onTimerFamilyMethod,
id,
timerDecl.field().getDeclaringClass().getCanonicalName());
onTimerFamilyMethodMap.put(
id,
analyzeOnTimerFamilyMethod(
errors, fnT, onTimerFamilyMethod, id, inputT, outputT, fnContext));
}
signatureBuilder.setOnTimerFamilyMethods(onTimerFamilyMethodMap);
// Check the converse - that all timers have a callback. This could be relaxed to only
// those timers used in methods, once method parameter lists support timers.
for (TimerDeclaration decl : fnContext.getTimerDeclarations().values()) {
errors.checkArgument(
onTimerMethodMap.containsKey(decl.id()),
"No callback registered via %s for timer %s",
format(DoFn.OnTimer.class),
decl.id());
}
// Check the converse - that all timer family have a callback.
for (TimerFamilyDeclaration decl : fnContext.getTimerFamilyDeclarations().values()) {
errors.checkArgument(
onTimerFamilyMethodMap.containsKey(decl.id()),
"No callback registered via %s for timerFamily %s",
format(DoFn.OnTimerFamily.class),
decl.id());
}
ErrorReporter processElementErrors =
errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
DoFnSignature.ProcessElementMethod processElement =
analyzeProcessElementMethod(
processElementErrors, fnT, processElementMethod, inputT, outputT, fnContext);
signatureBuilder.setProcessElement(processElement);
if (startBundleMethod != null) {
ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
signatureBuilder.setStartBundle(
analyzeStartBundleMethod(
startBundleErrors, fnT, startBundleMethod, inputT, outputT, fnContext));
}
if (finishBundleMethod != null) {
ErrorReporter finishBundleErrors =
errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
signatureBuilder.setFinishBundle(
analyzeFinishBundleMethod(
finishBundleErrors, fnT, finishBundleMethod, inputT, outputT, fnContext));
}
if (setupMethod != null) {
ErrorReporter setupErrors = errors.forMethod(DoFn.Setup.class, setupMethod);
signatureBuilder.setSetup(
analyzeSetupMethod(setupErrors, fnT, setupMethod, inputT, outputT, fnContext));
}
if (teardownMethod != null) {
signatureBuilder.setTeardown(
analyzeShutdownMethod(
errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
}
if (onWindowExpirationMethod != null) {
signatureBuilder.setOnWindowExpiration(
analyzeOnWindowExpirationMethod(
errors, fnT, onWindowExpirationMethod, inputT, outputT, fnContext));
}
if (processElement.isSplittable()) {
ErrorReporter getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
getInitialRestrictionErrors.checkNotNull(
getInitialRestrictionMethod,
"Splittable, but does not define the required @%s method.",
DoFnSignatures.format(DoFn.GetInitialRestriction.class));
GetInitialRestrictionMethod initialRestrictionMethod =
analyzeGetInitialRestrictionMethod(
errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod),
fnT,
getInitialRestrictionMethod,
inputT,
outputT,
fnContext);
signatureBuilder.setGetInitialRestriction(initialRestrictionMethod);
TypeDescriptor<?> restrictionT = initialRestrictionMethod.restrictionT();
TypeDescriptor<?> watermarkEstimatorStateT = TypeDescriptors.voids();
if (getInitialWatermarkEstimatorStateMethod != null) {
GetInitialWatermarkEstimatorStateMethod initialWatermarkEstimatorStateMethod =
analyzeGetInitialWatermarkEstimatorStateMethod(
errors.forMethod(
DoFn.GetInitialWatermarkEstimatorState.class,
getInitialWatermarkEstimatorStateMethod),
fnT,
getInitialWatermarkEstimatorStateMethod,
inputT,
outputT,
fnContext);
watermarkEstimatorStateT = initialWatermarkEstimatorStateMethod.watermarkEstimatorStateT();
signatureBuilder.setGetInitialWatermarkEstimatorState(initialWatermarkEstimatorStateMethod);
}
if (newTrackerMethod != null) {
signatureBuilder.setNewTracker(
analyzeNewTrackerMethod(
errors.forMethod(DoFn.NewTracker.class, newTrackerMethod),
fnT,
newTrackerMethod,
inputT,
outputT,
restrictionT,
fnContext));
} else {
errors
.forMethod(DoFn.NewTracker.class, null)
.checkArgument(
restrictionT.isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class)),
"Splittable, either @%s method must be defined or %s must implement %s.",
format(DoFn.NewTracker.class),
format(restrictionT),
format(HasDefaultTracker.class));
}
if (splitRestrictionMethod != null) {
signatureBuilder.setSplitRestriction(
analyzeSplitRestrictionMethod(
errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod),
fnT,
splitRestrictionMethod,
inputT,
outputT,
restrictionT,
fnContext));
}
if (truncateRestrictionMethod != null) {
signatureBuilder.setTruncateRestriction(
analyzeTruncateRestrictionMethod(
errors.forMethod(TruncateRestriction.class, truncateRestrictionMethod),
fnT,
truncateRestrictionMethod,
inputT,
restrictionT,
fnContext));
}
if (getSizeMethod != null) {
signatureBuilder.setGetSize(
analyzeGetSizeMethod(
errors.forMethod(DoFn.GetSize.class, getSizeMethod),
fnT,
getSizeMethod,
inputT,
outputT,
restrictionT,
fnContext));
}
if (getRestrictionCoderMethod != null) {
signatureBuilder.setGetRestrictionCoder(
analyzeGetRestrictionCoderMethod(
errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod),
fnT,
getRestrictionCoderMethod));
}
if (getWatermarkEstimatorStateCoderMethod != null) {
signatureBuilder.setGetWatermarkEstimatorStateCoder(
analyzeGetWatermarkEstimatorStateCoderMethod(
errors.forMethod(
DoFn.GetWatermarkEstimatorStateCoder.class,
getWatermarkEstimatorStateCoderMethod),
fnT,
getWatermarkEstimatorStateCoderMethod));
}
if (newWatermarkEstimatorMethod != null) {
signatureBuilder.setNewWatermarkEstimator(
analyzeNewWatermarkEstimatorMethod(
errors.forMethod(DoFn.NewWatermarkEstimator.class, newWatermarkEstimatorMethod),
fnT,
newWatermarkEstimatorMethod,
inputT,
outputT,
restrictionT,
watermarkEstimatorStateT,
fnContext));
} else if (getInitialWatermarkEstimatorStateMethod != null) {
errors
.forMethod(DoFn.NewWatermarkEstimator.class, null)
.checkArgument(
watermarkEstimatorStateT.isSubtypeOf(
TypeDescriptor.of(HasDefaultWatermarkEstimator.class)),
"Splittable, either @%s method must be defined or %s must implement %s.",
format(DoFn.NewWatermarkEstimator.class),
format(watermarkEstimatorStateT),
format(HasDefaultWatermarkEstimator.class));
}
} else {
// Validate that none of the splittable DoFn only methods have been declared.
List<String> forbiddenMethods = new ArrayList<>();
if (getInitialRestrictionMethod != null) {
forbiddenMethods.add("@" + format(DoFn.GetInitialRestriction.class));
}
if (splitRestrictionMethod != null) {
forbiddenMethods.add("@" + format(DoFn.SplitRestriction.class));
}
if (truncateRestrictionMethod != null) {
forbiddenMethods.add("@" + format(TruncateRestriction.class));
}
if (newTrackerMethod != null) {
forbiddenMethods.add("@" + format(DoFn.NewTracker.class));
}
if (getRestrictionCoderMethod != null) {
forbiddenMethods.add("@" + format(DoFn.GetRestrictionCoder.class));
}
if (getSizeMethod != null) {
forbiddenMethods.add("@" + format(DoFn.GetSize.class));
}
if (getInitialWatermarkEstimatorStateMethod != null) {
forbiddenMethods.add("@" + format(DoFn.GetInitialWatermarkEstimatorState.class));
}
if (getWatermarkEstimatorStateCoderMethod != null) {
forbiddenMethods.add("@" + format(DoFn.GetWatermarkEstimatorStateCoder.class));
}
if (newWatermarkEstimatorMethod != null) {
forbiddenMethods.add("@" + format(DoFn.NewWatermarkEstimator.class));
}
errors.checkArgument(
forbiddenMethods.isEmpty(), "Non-splittable, but defines methods: %s", forbiddenMethods);
}
signatureBuilder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
signatureBuilder.setStateDeclarations(fnContext.getStateDeclarations());
signatureBuilder.setTimerDeclarations(fnContext.getTimerDeclarations());
signatureBuilder.setTimerFamilyDeclarations(fnContext.getTimerFamilyDeclarations());
signatureBuilder.setFieldAccessDeclarations(fnContext.getFieldAccessDeclarations());
DoFnSignature signature = signatureBuilder.build();
// Additional validation for splittable DoFn's.
if (processElement.isSplittable()) {
verifySplittableMethods(signature, errors);
}
return signature;
}