private static Parameter analyzeExtraParameter()

in sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java [1295:1535]


  private static Parameter analyzeExtraParameter(
      ErrorReporter methodErrors,
      FnAnalysisContext fnContext,
      MethodAnalysisContext methodContext,
      TypeDescriptor<? extends DoFn<?, ?>> fnClass,
      ParameterDescription param,
      TypeDescriptor<?> inputT,
      TypeDescriptor<?> outputT) {

    TypeDescriptor<?> expectedProcessContextT = doFnProcessContextTypeOf(inputT, outputT);
    TypeDescriptor<?> expectedStartBundleContextT = doFnStartBundleContextTypeOf(inputT, outputT);
    TypeDescriptor<?> expectedFinishBundleContextT = doFnFinishBundleContextTypeOf(inputT, outputT);
    TypeDescriptor<?> expectedOnTimerContextT = doFnOnTimerContextTypeOf(inputT, outputT);
    TypeDescriptor<?> expectedOnWindowExpirationContextT =
        doFnOnWindowExpirationContextTypeOf(inputT, outputT);

    TypeDescriptor<?> paramT = param.getType();
    Class<?> rawType = paramT.getRawType();

    ErrorReporter paramErrors = methodErrors.forParameter(param);

    String fieldAccessString = getFieldAccessId(param.getAnnotations());
    if (fieldAccessString != null) {
      return Parameter.schemaElementParameter(paramT, fieldAccessString, param.getIndex());
    } else if (hasAnnotation(DoFn.Element.class, param.getAnnotations())) {
      return (paramT.equals(inputT))
          ? Parameter.elementParameter(paramT)
          : Parameter.schemaElementParameter(paramT, null, param.getIndex());
    } else if (hasAnnotation(DoFn.Restriction.class, param.getAnnotations())) {
      return Parameter.restrictionParameter(paramT);
    } else if (hasAnnotation(DoFn.WatermarkEstimatorState.class, param.getAnnotations())) {
      return Parameter.watermarkEstimatorState(paramT);
    } else if (hasAnnotation(DoFn.Timestamp.class, param.getAnnotations())) {
      methodErrors.checkArgument(
          rawType.equals(Instant.class),
          "@Timestamp argument must have type org.joda.time.Instant.");
      return Parameter.timestampParameter();
    } else if (hasAnnotation(DoFn.Key.class, param.getAnnotations())) {
      methodErrors.checkArgument(
          KV.class.equals(inputT.getRawType()),
          "@Key argument is expected to be use with input element of type KV.");

      Type keyType = ((ParameterizedType) inputT.getType()).getActualTypeArguments()[0];
      methodErrors.checkArgument(
          TypeDescriptor.of(keyType).equals(paramT),
          "@Key argument is expected to be type of %s, but found %s.",
          keyType,
          rawType);
      return Parameter.keyT(paramT);
    } else if (rawType.equals(TimeDomain.class)) {
      return Parameter.timeDomainParameter();
    } else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
      String sideInputId = getSideInputId(param.getAnnotations());
      paramErrors.checkArgument(
          sideInputId != null, "%s missing %s annotation", format(SideInput.class));
      return Parameter.sideInputParameter(paramT, sideInputId);
    } else if (rawType.equals(PaneInfo.class)) {
      return Parameter.paneInfoParameter();
    } else if (rawType.equals(DoFn.BundleFinalizer.class)) {
      return Parameter.bundleFinalizer();
    } else if (rawType.equals(DoFn.ProcessContext.class)) {
      paramErrors.checkArgument(
          paramT.equals(expectedProcessContextT),
          "ProcessContext argument must have type %s",
          format(expectedProcessContextT));
      return Parameter.processContext();
    } else if (rawType.equals(DoFn.StartBundleContext.class)) {
      paramErrors.checkArgument(
          paramT.equals(expectedStartBundleContextT),
          "StartBundleContext argument must have type %s",
          format(expectedProcessContextT));
      return Parameter.startBundleContext();
    } else if (rawType.equals(DoFn.FinishBundleContext.class)) {
      paramErrors.checkArgument(
          paramT.equals(expectedFinishBundleContextT),
          "FinishBundleContext argument must have type %s",
          format(expectedProcessContextT));
      return Parameter.finishBundleContext();
    } else if (rawType.equals(DoFn.OnTimerContext.class)) {
      paramErrors.checkArgument(
          paramT.equals(expectedOnTimerContextT),
          "OnTimerContext argument must have type %s",
          format(expectedOnTimerContextT));
      return Parameter.onTimerContext();
    } else if (rawType.equals(DoFn.OnWindowExpirationContext.class)) {
      paramErrors.checkArgument(
          paramT.equals(expectedOnWindowExpirationContextT),
          "OnWindowExpirationContext argument must have type %s",
          format(expectedOnWindowExpirationContextT));
      return Parameter.onWindowExpirationContext();
    } else if (BoundedWindow.class.isAssignableFrom(rawType)) {
      methodErrors.checkArgument(
          !methodContext.hasParameter(WindowParameter.class),
          "Multiple %s parameters",
          format(BoundedWindow.class));
      return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT);
    } else if (rawType.equals(OutputReceiver.class)) {
      // It's a schema row receiver if it's an OutputReceiver<Row> _and_ the output type is not
      // already Row.
      boolean schemaRowReceiver =
          paramT.equals(outputReceiverTypeOf(TypeDescriptor.of(Row.class)))
              && !outputT.equals(TypeDescriptor.of(Row.class));
      if (!schemaRowReceiver) {
        TypeDescriptor<?> expectedReceiverT = outputReceiverTypeOf(outputT);
        paramErrors.checkArgument(
            paramT.equals(expectedReceiverT),
            "OutputReceiver should be parameterized by %s",
            outputT);
      }
      return Parameter.outputReceiverParameter(schemaRowReceiver);
    } else if (rawType.equals(MultiOutputReceiver.class)) {
      return Parameter.taggedOutputReceiverParameter();
    } else if (PipelineOptions.class.equals(rawType)) {
      methodErrors.checkArgument(
          !methodContext.hasParameter(PipelineOptionsParameter.class),
          "Multiple %s parameters",
          format(PipelineOptions.class));
      return Parameter.pipelineOptions();
    } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
      methodErrors.checkArgument(
          !methodContext.hasParameter(RestrictionTrackerParameter.class),
          "Multiple %s parameters",
          format(RestrictionTracker.class));
      return Parameter.restrictionTracker(paramT);
    } else if (WatermarkEstimator.class.isAssignableFrom(rawType)) {
      methodErrors.checkArgument(
          !methodContext.hasParameter(WatermarkEstimatorParameter.class),
          "Multiple %s parameters",
          format(WatermarkEstimator.class));
      return Parameter.watermarkEstimator(paramT);
    } else if (rawType.equals(Timer.class)) {
      // m.getParameters() is not available until Java 8
      String id = getTimerId(param.getAnnotations());

      paramErrors.checkArgument(
          id != null, "%s missing %s annotation", format(Timer.class), format(TimerId.class));

      paramErrors.checkArgument(
          !methodContext.getTimerParameters().containsKey(id),
          "duplicate %s: \"%s\"",
          format(TimerId.class),
          id);

      TimerDeclaration timerDecl = fnContext.getTimerDeclarations().get(id);
      paramErrors.checkArgument(
          timerDecl != null, "reference to undeclared %s: \"%s\"", format(TimerId.class), id);

      paramErrors.checkArgument(
          timerDecl.field().getDeclaringClass().equals(getDeclaringClass(param.getMethod())),
          "%s %s declared in a different class %s."
              + " Timers may be referenced only in the lexical scope where they are declared.",
          format(TimerId.class),
          id,
          timerDecl.field().getDeclaringClass().getName());

      return Parameter.timerParameter(timerDecl);

    } else if (hasAnnotation(DoFn.TimerId.class, param.getAnnotations())) {
      boolean isValidTimerIdForTimerFamily =
          fnContext.getTimerFamilyDeclarations().size() > 0 && rawType.equals(String.class);
      paramErrors.checkArgument(
          isValidTimerIdForTimerFamily, "%s not allowed here", format(DoFn.TimerId.class));
      return Parameter.timerIdParameter();
    } else if (rawType.equals(TimerMap.class)) {
      String id = getTimerFamilyId(param.getAnnotations());

      paramErrors.checkArgument(
          id != null,
          "%s missing %s annotation",
          format(TimerMap.class),
          format(DoFn.TimerFamily.class));

      paramErrors.checkArgument(
          !methodContext.getTimerFamilyParameters().containsKey(id),
          "duplicate %s: \"%s\"",
          format(DoFn.TimerFamily.class),
          id);

      TimerFamilyDeclaration timerDecl = fnContext.getTimerFamilyDeclarations().get(id);
      paramErrors.checkArgument(
          timerDecl != null,
          "reference to undeclared %s: \"%s\"",
          format(DoFn.TimerFamily.class),
          id);

      paramErrors.checkArgument(
          timerDecl.field().getDeclaringClass().equals(getDeclaringClass(param.getMethod())),
          "%s %s declared in a different class %s."
              + " Timers may be referenced only in the lexical scope where they are declared.",
          format(DoFn.TimerFamily.class),
          id,
          timerDecl.field().getDeclaringClass().getName());

      return Parameter.timerFamilyParameter(timerDecl);
    } else if (State.class.isAssignableFrom(rawType)) {
      // m.getParameters() is not available until Java 8
      String id = getStateId(param.getAnnotations());
      paramErrors.checkArgument(id != null, "missing %s annotation", format(DoFn.StateId.class));

      paramErrors.checkArgument(
          !methodContext.getStateParameters().containsKey(id),
          "duplicate %s: \"%s\"",
          format(DoFn.StateId.class),
          id);

      // By static typing this is already a well-formed State subclass
      TypeDescriptor<? extends State> stateType = (TypeDescriptor<? extends State>) param.getType();

      StateDeclaration stateDecl = fnContext.getStateDeclarations().get(id);
      paramErrors.checkArgument(
          stateDecl != null, "reference to undeclared %s: \"%s\"", format(DoFn.StateId.class), id);

      paramErrors.checkArgument(
          stateDecl.stateType().isSubtypeOf(stateType),
          "data type of reference to %s %s must be a supertype of %s",
          format(StateId.class),
          id,
          format(stateDecl.stateType()));

      paramErrors.checkArgument(
          stateDecl.field().getDeclaringClass().equals(getDeclaringClass(param.getMethod())),
          "%s %s declared in a different class %s."
              + " State may be referenced only in the class where it is declared.",
          format(StateId.class),
          id,
          stateDecl.field().getDeclaringClass().getName());

      boolean alwaysFetched = getStateAlwaysFetched(param.getAnnotations());
      if (alwaysFetched) {
        paramErrors.checkArgument(
            ReadableState.class.isAssignableFrom(rawType),
            "@AlwaysFetched can only be used on ReadableStates. It cannot be used on %s",
            format(stateDecl.stateType()));
      }
      return Parameter.stateParameter(stateDecl, alwaysFetched);
    } else {
      paramErrors.throwIllegalArgument("%s is not a valid context parameter.", format(paramT));
      // Unreachable
      return null;
    }
  }