in sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java [860:1098]
static StackManipulation getExtraContextParameter(
DoFnSignature.Parameter parameter, final StackManipulation pushDelegate) {
return parameter.match(
new Cases<StackManipulation>() {
@Override
public StackManipulation dispatch(StartBundleContextParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
START_BUNDLE_CONTEXT_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(FinishBundleContextParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
FINISH_BUNDLE_CONTEXT_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(ProcessContextParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
PROCESS_CONTEXT_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(ElementParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(ELEMENT_PARAMETER_METHOD, DoFn.class)),
TypeCasting.to(new TypeDescription.ForLoadedType(p.elementT().getRawType())));
}
@Override
public StackManipulation dispatch(SchemaElementParameter p) {
ForLoadedType elementType = new ForLoadedType(p.elementT().getRawType());
ForLoadedType castType =
elementType.isPrimitive()
? new ForLoadedType(Primitives.wrap(p.elementT().getRawType()))
: elementType;
StackManipulation stackManipulation =
new StackManipulation.Compound(
IntegerConstant.forValue(p.index()),
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
SCHEMA_ELEMENT_PARAMETER_METHOD, int.class)),
TypeCasting.to(castType));
if (elementType.isPrimitive()) {
stackManipulation =
new Compound(
stackManipulation,
Assigner.DEFAULT.assign(
elementType.asBoxed().asGenericType(),
elementType.asUnboxed().asGenericType(),
Typing.STATIC));
}
return stackManipulation;
}
@Override
public StackManipulation dispatch(TimestampParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
TIMESTAMP_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(BundleFinalizerParameter p) {
return simpleExtraContextParameter(BUNDLE_FINALIZER_PARAMETER_METHOD);
}
@Override
public StackManipulation dispatch(TimeDomainParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
TIME_DOMAIN_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(OutputReceiverParameter p) {
String method =
p.isRowReceiver() ? OUTPUT_ROW_RECEIVER_METHOD : OUTPUT_PARAMETER_METHOD;
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(method, DoFn.class)));
}
@Override
public StackManipulation dispatch(TaggedOutputReceiverParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
TAGGED_OUTPUT_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(OnTimerContextParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
ON_TIMER_CONTEXT_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(WindowParameter p) {
return new StackManipulation.Compound(
simpleExtraContextParameter(WINDOW_PARAMETER_METHOD),
TypeCasting.to(new TypeDescription.ForLoadedType(p.windowT().getRawType())));
}
@Override
public StackManipulation dispatch(PaneInfoParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
PANE_INFO_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(RestrictionParameter p) {
// DoFnInvoker.ArgumentProvider.restriction() returns an Object,
// but the methods expect a concrete subtype of it.
// Insert a downcast.
return new StackManipulation.Compound(
simpleExtraContextParameter(RESTRICTION_PARAMETER_METHOD),
TypeCasting.to(new TypeDescription.ForLoadedType(p.restrictionT().getRawType())));
}
@Override
public StackManipulation dispatch(RestrictionTrackerParameter p) {
// DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
// but the methods expect a concrete subtype of it.
// Insert a downcast.
return new StackManipulation.Compound(
simpleExtraContextParameter(RESTRICTION_TRACKER_PARAMETER_METHOD),
TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
}
@Override
public StackManipulation dispatch(WatermarkEstimatorParameter p) {
// DoFnInvoker.ArgumentProvider.watermarkEstimator() returns a WatermarkEstimator,
// but the methods expect a concrete subtype of it.
// Insert a downcast.
return new StackManipulation.Compound(
simpleExtraContextParameter(WATERMARK_ESTIMATOR_PARAMETER_METHOD),
TypeCasting.to(new TypeDescription.ForLoadedType(p.estimatorT().getRawType())));
}
@Override
public StackManipulation dispatch(WatermarkEstimatorStateParameter p) {
// DoFnInvoker.ArgumentProvider.watermarkEstimatorState() returns an Object,
// but the methods expect a concrete subtype of it.
// Insert a downcast.
return new StackManipulation.Compound(
simpleExtraContextParameter(WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD),
TypeCasting.to(
new TypeDescription.ForLoadedType(p.estimatorStateT().getRawType())));
}
@Override
public StackManipulation dispatch(StateParameter p) {
return new StackManipulation.Compound(
new TextConstant(p.referent().id()),
IntegerConstant.forValue(p.alwaysFetched()),
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
STATE_PARAMETER_METHOD, String.class, boolean.class)),
TypeCasting.to(
new TypeDescription.ForLoadedType(p.referent().stateType().getRawType())));
}
@Override
public StackManipulation dispatch(TimerParameter p) {
return new StackManipulation.Compound(
new TextConstant(p.referent().id()),
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)),
TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
}
@Override
public StackManipulation dispatch(TimerFamilyParameter p) {
return new StackManipulation.Compound(
new TextConstant(p.referent().id()),
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
TIMER_FAMILY_PARAMETER_METHOD, String.class)),
TypeCasting.to(new TypeDescription.ForLoadedType(TimerMap.class)));
}
@Override
public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter p) {
return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD);
}
@Override
public StackManipulation dispatch(SideInputParameter p) {
return new StackManipulation.Compound(
new TextConstant(p.sideInputId()),
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
SIDE_INPUT_PARAMETER_METHOD, String.class)),
TypeCasting.to(new TypeDescription.ForLoadedType(p.elementT().getRawType())));
}
@Override
public StackManipulation dispatch(DoFnSignature.Parameter.TimerIdParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
TIMER_ID_PARAMETER_METHOD, DoFn.class)));
}
@Override
public StackManipulation dispatch(DoFnSignature.Parameter.KeyParameter p) {
return new StackManipulation.Compound(
simpleExtraContextParameter(KEY_PARAMETER_METHOD),
TypeCasting.to(new TypeDescription.ForLoadedType(p.keyT().getRawType())));
}
});
}