private static DoFnSignature parseSignature()

in sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java [489:844]


  private static DoFnSignature parseSignature(Class<? extends DoFn<?, ?>> fnClass) {
    DoFnSignature.Builder signatureBuilder = DoFnSignature.builder();

    ErrorReporter errors = new ErrorReporter(null, fnClass.getName());
    errors.checkArgument(DoFn.class.isAssignableFrom(fnClass), "Must be subtype of DoFn");
    signatureBuilder.setFnClass(fnClass);

    TypeDescriptor<? extends DoFn<?, ?>> fnT = TypeDescriptor.of(fnClass);

    // Extract the input and output type, and whether the fn is bounded.
    TypeDescriptor<?> inputT = null;
    TypeDescriptor<?> outputT = null;
    for (TypeDescriptor<?> supertype : fnT.getTypes()) {
      if (!supertype.getRawType().equals(DoFn.class)) {
        continue;
      }
      Type[] args = ((ParameterizedType) supertype.getType()).getActualTypeArguments();
      inputT = TypeDescriptor.of(args[0]);
      outputT = TypeDescriptor.of(args[1]);
    }
    errors.checkNotNull(inputT, "Unable to determine input type");

    // Find the state and timer declarations in advance of validating
    // method parameter lists
    FnAnalysisContext fnContext = FnAnalysisContext.create();
    fnContext.addStateDeclarations(analyzeStateDeclarations(errors, fnClass).values());
    fnContext.addTimerDeclarations(analyzeTimerDeclarations(errors, fnClass).values());
    fnContext.addTimerFamilyDeclarations(analyzeTimerFamilyDeclarations(errors, fnClass).values());
    fnContext.addFieldAccessDeclarations(analyzeFieldAccessDeclaration(errors, fnClass).values());

    Method processElementMethod =
        findAnnotatedMethod(errors, DoFn.ProcessElement.class, fnClass, true);
    Method startBundleMethod = findAnnotatedMethod(errors, DoFn.StartBundle.class, fnClass, false);
    Method finishBundleMethod =
        findAnnotatedMethod(errors, DoFn.FinishBundle.class, fnClass, false);
    Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false);
    Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false);
    Method onWindowExpirationMethod =
        findAnnotatedMethod(errors, DoFn.OnWindowExpiration.class, fnClass, false);
    Method getInitialRestrictionMethod =
        findAnnotatedMethod(errors, DoFn.GetInitialRestriction.class, fnClass, false);
    Method splitRestrictionMethod =
        findAnnotatedMethod(errors, DoFn.SplitRestriction.class, fnClass, false);
    Method truncateRestrictionMethod =
        findAnnotatedMethod(errors, TruncateRestriction.class, fnClass, false);
    Method getRestrictionCoderMethod =
        findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, false);
    Method newTrackerMethod = findAnnotatedMethod(errors, DoFn.NewTracker.class, fnClass, false);
    Method getSizeMethod = findAnnotatedMethod(errors, DoFn.GetSize.class, fnClass, false);
    Method getWatermarkEstimatorStateCoderMethod =
        findAnnotatedMethod(errors, DoFn.GetWatermarkEstimatorStateCoder.class, fnClass, false);
    Method getInitialWatermarkEstimatorStateMethod =
        findAnnotatedMethod(errors, DoFn.GetInitialWatermarkEstimatorState.class, fnClass, false);
    Method newWatermarkEstimatorMethod =
        findAnnotatedMethod(errors, DoFn.NewWatermarkEstimator.class, fnClass, false);

    Collection<Method> onTimerMethods =
        ReflectHelpers.declaredMethodsWithAnnotation(DoFn.OnTimer.class, fnClass, DoFn.class);
    HashMap<String, DoFnSignature.OnTimerMethod> onTimerMethodMap =
        Maps.newHashMapWithExpectedSize(onTimerMethods.size());
    for (Method onTimerMethod : onTimerMethods) {
      String id = TimerDeclaration.PREFIX + onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
      errors.checkArgument(
          fnContext.getTimerDeclarations().containsKey(id),
          "Callback %s is for undeclared timer %s",
          onTimerMethod,
          id);

      TimerDeclaration timerDecl = fnContext.getTimerDeclarations().get(id);
      errors.checkArgument(
          timerDecl.field().getDeclaringClass().equals(getDeclaringClass(onTimerMethod)),
          "Callback %s is for timer %s declared in a different class %s."
              + " Timer callbacks must be declared in the same lexical scope as their timer",
          onTimerMethod,
          id,
          timerDecl.field().getDeclaringClass().getCanonicalName());

      onTimerMethodMap.put(
          id, analyzeOnTimerMethod(errors, fnT, onTimerMethod, id, inputT, outputT, fnContext));
    }
    signatureBuilder.setOnTimerMethods(onTimerMethodMap);

    // Check for TimerFamily
    Collection<Method> onTimerFamilyMethods =
        ReflectHelpers.declaredMethodsWithAnnotation(DoFn.OnTimerFamily.class, fnClass, DoFn.class);
    HashMap<String, DoFnSignature.OnTimerFamilyMethod> onTimerFamilyMethodMap =
        Maps.newHashMapWithExpectedSize(onTimerFamilyMethods.size());

    for (Method onTimerFamilyMethod : onTimerFamilyMethods) {
      String id =
          TimerFamilyDeclaration.PREFIX
              + onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value();
      errors.checkArgument(
          fnContext.getTimerFamilyDeclarations().containsKey(id),
          "Callback %s is for undeclared timerFamily %s",
          onTimerFamilyMethod,
          id);

      TimerFamilyDeclaration timerDecl = fnContext.getTimerFamilyDeclarations().get(id);
      errors.checkArgument(
          timerDecl.field().getDeclaringClass().equals(getDeclaringClass(onTimerFamilyMethod)),
          "Callback %s is for timerFamily %s declared in a different class %s."
              + " TimerFamily callbacks must be declared in the same lexical scope as their timer",
          onTimerFamilyMethod,
          id,
          timerDecl.field().getDeclaringClass().getCanonicalName());

      onTimerFamilyMethodMap.put(
          id,
          analyzeOnTimerFamilyMethod(
              errors, fnT, onTimerFamilyMethod, id, inputT, outputT, fnContext));
    }
    signatureBuilder.setOnTimerFamilyMethods(onTimerFamilyMethodMap);

    // Check the converse - that all timers have a callback. This could be relaxed to only
    // those timers used in methods, once method parameter lists support timers.
    for (TimerDeclaration decl : fnContext.getTimerDeclarations().values()) {
      errors.checkArgument(
          onTimerMethodMap.containsKey(decl.id()),
          "No callback registered via %s for timer %s",
          format(DoFn.OnTimer.class),
          decl.id());
    }

    // Check the converse - that all timer family have a callback.

    for (TimerFamilyDeclaration decl : fnContext.getTimerFamilyDeclarations().values()) {
      errors.checkArgument(
          onTimerFamilyMethodMap.containsKey(decl.id()),
          "No callback registered via %s for timerFamily %s",
          format(DoFn.OnTimerFamily.class),
          decl.id());
    }

    ErrorReporter processElementErrors =
        errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
    DoFnSignature.ProcessElementMethod processElement =
        analyzeProcessElementMethod(
            processElementErrors, fnT, processElementMethod, inputT, outputT, fnContext);
    signatureBuilder.setProcessElement(processElement);

    if (startBundleMethod != null) {
      ErrorReporter startBundleErrors = errors.forMethod(DoFn.StartBundle.class, startBundleMethod);
      signatureBuilder.setStartBundle(
          analyzeStartBundleMethod(
              startBundleErrors, fnT, startBundleMethod, inputT, outputT, fnContext));
    }

    if (finishBundleMethod != null) {
      ErrorReporter finishBundleErrors =
          errors.forMethod(DoFn.FinishBundle.class, finishBundleMethod);
      signatureBuilder.setFinishBundle(
          analyzeFinishBundleMethod(
              finishBundleErrors, fnT, finishBundleMethod, inputT, outputT, fnContext));
    }

    if (setupMethod != null) {
      ErrorReporter setupErrors = errors.forMethod(DoFn.Setup.class, setupMethod);
      signatureBuilder.setSetup(
          analyzeSetupMethod(setupErrors, fnT, setupMethod, inputT, outputT, fnContext));
    }

    if (teardownMethod != null) {
      signatureBuilder.setTeardown(
          analyzeShutdownMethod(
              errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
    }

    if (onWindowExpirationMethod != null) {
      signatureBuilder.setOnWindowExpiration(
          analyzeOnWindowExpirationMethod(
              errors, fnT, onWindowExpirationMethod, inputT, outputT, fnContext));
    }

    if (processElement.isSplittable()) {
      ErrorReporter getInitialRestrictionErrors =
          errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
      getInitialRestrictionErrors.checkNotNull(
          getInitialRestrictionMethod,
          "Splittable, but does not define the required @%s method.",
          DoFnSignatures.format(DoFn.GetInitialRestriction.class));

      GetInitialRestrictionMethod initialRestrictionMethod =
          analyzeGetInitialRestrictionMethod(
              errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod),
              fnT,
              getInitialRestrictionMethod,
              inputT,
              outputT,
              fnContext);

      signatureBuilder.setGetInitialRestriction(initialRestrictionMethod);
      TypeDescriptor<?> restrictionT = initialRestrictionMethod.restrictionT();

      TypeDescriptor<?> watermarkEstimatorStateT = TypeDescriptors.voids();
      if (getInitialWatermarkEstimatorStateMethod != null) {
        GetInitialWatermarkEstimatorStateMethod initialWatermarkEstimatorStateMethod =
            analyzeGetInitialWatermarkEstimatorStateMethod(
                errors.forMethod(
                    DoFn.GetInitialWatermarkEstimatorState.class,
                    getInitialWatermarkEstimatorStateMethod),
                fnT,
                getInitialWatermarkEstimatorStateMethod,
                inputT,
                outputT,
                fnContext);
        watermarkEstimatorStateT = initialWatermarkEstimatorStateMethod.watermarkEstimatorStateT();
        signatureBuilder.setGetInitialWatermarkEstimatorState(initialWatermarkEstimatorStateMethod);
      }

      if (newTrackerMethod != null) {
        signatureBuilder.setNewTracker(
            analyzeNewTrackerMethod(
                errors.forMethod(DoFn.NewTracker.class, newTrackerMethod),
                fnT,
                newTrackerMethod,
                inputT,
                outputT,
                restrictionT,
                fnContext));
      } else {
        errors
            .forMethod(DoFn.NewTracker.class, null)
            .checkArgument(
                restrictionT.isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class)),
                "Splittable, either @%s method must be defined or %s must implement %s.",
                format(DoFn.NewTracker.class),
                format(restrictionT),
                format(HasDefaultTracker.class));
      }

      if (splitRestrictionMethod != null) {
        signatureBuilder.setSplitRestriction(
            analyzeSplitRestrictionMethod(
                errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod),
                fnT,
                splitRestrictionMethod,
                inputT,
                outputT,
                restrictionT,
                fnContext));
      }

      if (truncateRestrictionMethod != null) {
        signatureBuilder.setTruncateRestriction(
            analyzeTruncateRestrictionMethod(
                errors.forMethod(TruncateRestriction.class, truncateRestrictionMethod),
                fnT,
                truncateRestrictionMethod,
                inputT,
                restrictionT,
                fnContext));
      }

      if (getSizeMethod != null) {
        signatureBuilder.setGetSize(
            analyzeGetSizeMethod(
                errors.forMethod(DoFn.GetSize.class, getSizeMethod),
                fnT,
                getSizeMethod,
                inputT,
                outputT,
                restrictionT,
                fnContext));
      }

      if (getRestrictionCoderMethod != null) {
        signatureBuilder.setGetRestrictionCoder(
            analyzeGetRestrictionCoderMethod(
                errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod),
                fnT,
                getRestrictionCoderMethod));
      }

      if (getWatermarkEstimatorStateCoderMethod != null) {
        signatureBuilder.setGetWatermarkEstimatorStateCoder(
            analyzeGetWatermarkEstimatorStateCoderMethod(
                errors.forMethod(
                    DoFn.GetWatermarkEstimatorStateCoder.class,
                    getWatermarkEstimatorStateCoderMethod),
                fnT,
                getWatermarkEstimatorStateCoderMethod));
      }

      if (newWatermarkEstimatorMethod != null) {
        signatureBuilder.setNewWatermarkEstimator(
            analyzeNewWatermarkEstimatorMethod(
                errors.forMethod(DoFn.NewWatermarkEstimator.class, newWatermarkEstimatorMethod),
                fnT,
                newWatermarkEstimatorMethod,
                inputT,
                outputT,
                restrictionT,
                watermarkEstimatorStateT,
                fnContext));
      } else if (getInitialWatermarkEstimatorStateMethod != null) {
        errors
            .forMethod(DoFn.NewWatermarkEstimator.class, null)
            .checkArgument(
                watermarkEstimatorStateT.isSubtypeOf(
                    TypeDescriptor.of(HasDefaultWatermarkEstimator.class)),
                "Splittable, either @%s method must be defined or %s must implement %s.",
                format(DoFn.NewWatermarkEstimator.class),
                format(watermarkEstimatorStateT),
                format(HasDefaultWatermarkEstimator.class));
      }
    } else {
      // Validate that none of the splittable DoFn only methods have been declared.
      List<String> forbiddenMethods = new ArrayList<>();
      if (getInitialRestrictionMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.GetInitialRestriction.class));
      }
      if (splitRestrictionMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.SplitRestriction.class));
      }
      if (truncateRestrictionMethod != null) {
        forbiddenMethods.add("@" + format(TruncateRestriction.class));
      }
      if (newTrackerMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.NewTracker.class));
      }
      if (getRestrictionCoderMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.GetRestrictionCoder.class));
      }
      if (getSizeMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.GetSize.class));
      }
      if (getInitialWatermarkEstimatorStateMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.GetInitialWatermarkEstimatorState.class));
      }
      if (getWatermarkEstimatorStateCoderMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.GetWatermarkEstimatorStateCoder.class));
      }
      if (newWatermarkEstimatorMethod != null) {
        forbiddenMethods.add("@" + format(DoFn.NewWatermarkEstimator.class));
      }
      errors.checkArgument(
          forbiddenMethods.isEmpty(), "Non-splittable, but defines methods: %s", forbiddenMethods);
    }

    signatureBuilder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));

    signatureBuilder.setStateDeclarations(fnContext.getStateDeclarations());
    signatureBuilder.setTimerDeclarations(fnContext.getTimerDeclarations());
    signatureBuilder.setTimerFamilyDeclarations(fnContext.getTimerFamilyDeclarations());
    signatureBuilder.setFieldAccessDeclarations(fnContext.getFieldAccessDeclarations());

    DoFnSignature signature = signatureBuilder.build();

    // Additional validation for splittable DoFn's.
    if (processElement.isSplittable()) {
      verifySplittableMethods(signature, errors);
    }

    return signature;
  }