in sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java [1295:1535]
private static Parameter analyzeExtraParameter(
ErrorReporter methodErrors,
FnAnalysisContext fnContext,
MethodAnalysisContext methodContext,
TypeDescriptor<? extends DoFn<?, ?>> fnClass,
ParameterDescription param,
TypeDescriptor<?> inputT,
TypeDescriptor<?> outputT) {
TypeDescriptor<?> expectedProcessContextT = doFnProcessContextTypeOf(inputT, outputT);
TypeDescriptor<?> expectedStartBundleContextT = doFnStartBundleContextTypeOf(inputT, outputT);
TypeDescriptor<?> expectedFinishBundleContextT = doFnFinishBundleContextTypeOf(inputT, outputT);
TypeDescriptor<?> expectedOnTimerContextT = doFnOnTimerContextTypeOf(inputT, outputT);
TypeDescriptor<?> expectedOnWindowExpirationContextT =
doFnOnWindowExpirationContextTypeOf(inputT, outputT);
TypeDescriptor<?> paramT = param.getType();
Class<?> rawType = paramT.getRawType();
ErrorReporter paramErrors = methodErrors.forParameter(param);
String fieldAccessString = getFieldAccessId(param.getAnnotations());
if (fieldAccessString != null) {
return Parameter.schemaElementParameter(paramT, fieldAccessString, param.getIndex());
} else if (hasAnnotation(DoFn.Element.class, param.getAnnotations())) {
return (paramT.equals(inputT))
? Parameter.elementParameter(paramT)
: Parameter.schemaElementParameter(paramT, null, param.getIndex());
} else if (hasAnnotation(DoFn.Restriction.class, param.getAnnotations())) {
return Parameter.restrictionParameter(paramT);
} else if (hasAnnotation(DoFn.WatermarkEstimatorState.class, param.getAnnotations())) {
return Parameter.watermarkEstimatorState(paramT);
} else if (hasAnnotation(DoFn.Timestamp.class, param.getAnnotations())) {
methodErrors.checkArgument(
rawType.equals(Instant.class),
"@Timestamp argument must have type org.joda.time.Instant.");
return Parameter.timestampParameter();
} else if (hasAnnotation(DoFn.Key.class, param.getAnnotations())) {
methodErrors.checkArgument(
KV.class.equals(inputT.getRawType()),
"@Key argument is expected to be use with input element of type KV.");
Type keyType = ((ParameterizedType) inputT.getType()).getActualTypeArguments()[0];
methodErrors.checkArgument(
TypeDescriptor.of(keyType).equals(paramT),
"@Key argument is expected to be type of %s, but found %s.",
keyType,
rawType);
return Parameter.keyT(paramT);
} else if (rawType.equals(TimeDomain.class)) {
return Parameter.timeDomainParameter();
} else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
String sideInputId = getSideInputId(param.getAnnotations());
paramErrors.checkArgument(
sideInputId != null, "%s missing %s annotation", format(SideInput.class));
return Parameter.sideInputParameter(paramT, sideInputId);
} else if (rawType.equals(PaneInfo.class)) {
return Parameter.paneInfoParameter();
} else if (rawType.equals(DoFn.BundleFinalizer.class)) {
return Parameter.bundleFinalizer();
} else if (rawType.equals(DoFn.ProcessContext.class)) {
paramErrors.checkArgument(
paramT.equals(expectedProcessContextT),
"ProcessContext argument must have type %s",
format(expectedProcessContextT));
return Parameter.processContext();
} else if (rawType.equals(DoFn.StartBundleContext.class)) {
paramErrors.checkArgument(
paramT.equals(expectedStartBundleContextT),
"StartBundleContext argument must have type %s",
format(expectedProcessContextT));
return Parameter.startBundleContext();
} else if (rawType.equals(DoFn.FinishBundleContext.class)) {
paramErrors.checkArgument(
paramT.equals(expectedFinishBundleContextT),
"FinishBundleContext argument must have type %s",
format(expectedProcessContextT));
return Parameter.finishBundleContext();
} else if (rawType.equals(DoFn.OnTimerContext.class)) {
paramErrors.checkArgument(
paramT.equals(expectedOnTimerContextT),
"OnTimerContext argument must have type %s",
format(expectedOnTimerContextT));
return Parameter.onTimerContext();
} else if (rawType.equals(DoFn.OnWindowExpirationContext.class)) {
paramErrors.checkArgument(
paramT.equals(expectedOnWindowExpirationContextT),
"OnWindowExpirationContext argument must have type %s",
format(expectedOnWindowExpirationContextT));
return Parameter.onWindowExpirationContext();
} else if (BoundedWindow.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
!methodContext.hasParameter(WindowParameter.class),
"Multiple %s parameters",
format(BoundedWindow.class));
return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT);
} else if (rawType.equals(OutputReceiver.class)) {
// It's a schema row receiver if it's an OutputReceiver<Row> _and_ the output type is not
// already Row.
boolean schemaRowReceiver =
paramT.equals(outputReceiverTypeOf(TypeDescriptor.of(Row.class)))
&& !outputT.equals(TypeDescriptor.of(Row.class));
if (!schemaRowReceiver) {
TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(outputT);
paramErrors.checkArgument(
paramT.equals(expectedReceiverT),
"OutputReceiver should be parameterized by %s",
outputT);
}
return Parameter.outputReceiverParameter(schemaRowReceiver);
} else if (rawType.equals(MultiOutputReceiver.class)) {
return Parameter.taggedOutputReceiverParameter();
} else if (PipelineOptions.class.equals(rawType)) {
methodErrors.checkArgument(
!methodContext.hasParameter(PipelineOptionsParameter.class),
"Multiple %s parameters",
format(PipelineOptions.class));
return Parameter.pipelineOptions();
} else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
!methodContext.hasParameter(RestrictionTrackerParameter.class),
"Multiple %s parameters",
format(RestrictionTracker.class));
return Parameter.restrictionTracker(paramT);
} else if (WatermarkEstimator.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
!methodContext.hasParameter(WatermarkEstimatorParameter.class),
"Multiple %s parameters",
format(WatermarkEstimator.class));
return Parameter.watermarkEstimator(paramT);
} else if (rawType.equals(Timer.class)) {
// m.getParameters() is not available until Java 8
String id = getTimerId(param.getAnnotations());
paramErrors.checkArgument(
id != null, "%s missing %s annotation", format(Timer.class), format(TimerId.class));
paramErrors.checkArgument(
!methodContext.getTimerParameters().containsKey(id),
"duplicate %s: \"%s\"",
format(TimerId.class),
id);
TimerDeclaration timerDecl = fnContext.getTimerDeclarations().get(id);
paramErrors.checkArgument(
timerDecl != null, "reference to undeclared %s: \"%s\"", format(TimerId.class), id);
paramErrors.checkArgument(
timerDecl.field().getDeclaringClass().equals(getDeclaringClass(param.getMethod())),
"%s %s declared in a different class %s."
+ " Timers may be referenced only in the lexical scope where they are declared.",
format(TimerId.class),
id,
timerDecl.field().getDeclaringClass().getName());
return Parameter.timerParameter(timerDecl);
} else if (hasAnnotation(DoFn.TimerId.class, param.getAnnotations())) {
boolean isValidTimerIdForTimerFamily =
fnContext.getTimerFamilyDeclarations().size() > 0 && rawType.equals(String.class);
paramErrors.checkArgument(
isValidTimerIdForTimerFamily, "%s not allowed here", format(DoFn.TimerId.class));
return Parameter.timerIdParameter();
} else if (rawType.equals(TimerMap.class)) {
String id = getTimerFamilyId(param.getAnnotations());
paramErrors.checkArgument(
id != null,
"%s missing %s annotation",
format(TimerMap.class),
format(DoFn.TimerFamily.class));
paramErrors.checkArgument(
!methodContext.getTimerFamilyParameters().containsKey(id),
"duplicate %s: \"%s\"",
format(DoFn.TimerFamily.class),
id);
TimerFamilyDeclaration timerDecl = fnContext.getTimerFamilyDeclarations().get(id);
paramErrors.checkArgument(
timerDecl != null,
"reference to undeclared %s: \"%s\"",
format(DoFn.TimerFamily.class),
id);
paramErrors.checkArgument(
timerDecl.field().getDeclaringClass().equals(getDeclaringClass(param.getMethod())),
"%s %s declared in a different class %s."
+ " Timers may be referenced only in the lexical scope where they are declared.",
format(DoFn.TimerFamily.class),
id,
timerDecl.field().getDeclaringClass().getName());
return Parameter.timerFamilyParameter(timerDecl);
} else if (State.class.isAssignableFrom(rawType)) {
// m.getParameters() is not available until Java 8
String id = getStateId(param.getAnnotations());
paramErrors.checkArgument(id != null, "missing %s annotation", format(DoFn.StateId.class));
paramErrors.checkArgument(
!methodContext.getStateParameters().containsKey(id),
"duplicate %s: \"%s\"",
format(DoFn.StateId.class),
id);
// By static typing this is already a well-formed State subclass
TypeDescriptor<? extends State> stateType = (TypeDescriptor<? extends State>) param.getType();
StateDeclaration stateDecl = fnContext.getStateDeclarations().get(id);
paramErrors.checkArgument(
stateDecl != null, "reference to undeclared %s: \"%s\"", format(DoFn.StateId.class), id);
paramErrors.checkArgument(
stateDecl.stateType().isSubtypeOf(stateType),
"data type of reference to %s %s must be a supertype of %s",
format(StateId.class),
id,
format(stateDecl.stateType()));
paramErrors.checkArgument(
stateDecl.field().getDeclaringClass().equals(getDeclaringClass(param.getMethod())),
"%s %s declared in a different class %s."
+ " State may be referenced only in the class where it is declared.",
format(StateId.class),
id,
stateDecl.field().getDeclaringClass().getName());
boolean alwaysFetched = getStateAlwaysFetched(param.getAnnotations());
if (alwaysFetched) {
paramErrors.checkArgument(
ReadableState.class.isAssignableFrom(rawType),
"@AlwaysFetched can only be used on ReadableStates. It cannot be used on %s",
format(stateDecl.stateType()));
}
return Parameter.stateParameter(stateDecl, alwaysFetched);
} else {
paramErrors.throwIllegalArgument("%s is not a valid context parameter.", format(paramT));
// Unreachable
return null;
}
}