func()

in apis/flinkcluster/v1beta1/flinkcluster_validate.go [329:390]


func (v *Validator) validateJobManager(flinkVersion *version.Version, jmSpec *JobManagerSpec) error {
	var err error
	if jmSpec == nil {
		return nil
	}

	fp := field.NewPath("spec.jobManager")
	if errors := validation.ValidateAnnotations(jmSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
		return fmt.Errorf(errors.ToAggregate().Error())
	}
	if errors := v1validation.ValidateLabels(jmSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
		return fmt.Errorf(errors.ToAggregate().Error())
	}

	// Ports.
	var ports = []NamedPort{
		{Name: "rpc", ContainerPort: *jmSpec.Ports.RPC},
		{Name: "blob", ContainerPort: *jmSpec.Ports.Blob},
		{Name: "query", ContainerPort: *jmSpec.Ports.Query},
		{Name: "ui", ContainerPort: *jmSpec.Ports.UI},
	}
	ports = append(ports, jmSpec.ExtraPorts...)
	err = v.checkDupPorts(ports, "jobmanager")
	if err != nil {
		return err
	}

	if err := v.validateResourceRequirements(jmSpec.Resources, "jobmanager"); err != nil {
		return err
	}

	if flinkVersion == nil || flinkVersion.LessThan(v10) {
		if jmSpec.MemoryProcessRatio != nil {
			return fmt.Errorf("MemoryProcessRatio config cannot be used with flinkVersion < 1.11', use " +
				"memoryOffHeapRatio instead")
		}

		// MemoryOffHeapRatio
		err = v.validateRatio(jmSpec.MemoryOffHeapRatio, "jobmanager", "memoryOffHeapRatio")
		if err != nil {
			return err
		}

		// MemoryOffHeapMin
		err = v.validateMemoryOffHeapMin(&jmSpec.MemoryOffHeapMin, jmSpec.GetResources().Memory(), "jobmanager")
		if err != nil {
			return err
		}
	} else {
		if jmSpec.MemoryOffHeapRatio != nil || !jmSpec.MemoryOffHeapMin.IsZero() {
			return fmt.Errorf("MemoryOffHeapRatio or MemoryOffHeapMin config cannot be used with flinkVersion >= 1.11'; " +
				"use memoryProcessRatio istead")
		}
		// MemoryProcessRatio
		err = v.validateRatio(jmSpec.MemoryProcessRatio, "jobmanager", "memoryProcessRatio")
		if err != nil {
			return err
		}
	}

	return nil
}