func()

in apis/flinkcluster/v1beta1/flinkcluster_validate.go [392:452]


func (v *Validator) validateTaskManager(flinkVersion *version.Version, tmSpec *TaskManagerSpec) error {
	if tmSpec == nil {
		return nil
	}

	fp := field.NewPath("spec.taskManager")
	if errors := validation.ValidateAnnotations(tmSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
		return fmt.Errorf(errors.ToAggregate().Error())
	}
	if errors := v1validation.ValidateLabels(tmSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
		return fmt.Errorf(errors.ToAggregate().Error())
	}

	// Ports.
	var err error
	var ports = []NamedPort{
		{Name: "rpc", ContainerPort: *tmSpec.Ports.RPC},
		{Name: "data", ContainerPort: *tmSpec.Ports.Data},
		{Name: "query", ContainerPort: *tmSpec.Ports.Query},
	}
	ports = append(ports, tmSpec.ExtraPorts...)
	err = v.checkDupPorts(ports, "taskmanager")
	if err != nil {
		return err
	}

	if err := v.validateResourceRequirements(tmSpec.Resources, "taskmanager"); err != nil {
		return err
	}

	if flinkVersion == nil || flinkVersion.LessThan(v10) {
		if tmSpec.MemoryProcessRatio != nil {
			return fmt.Errorf("MemoryProcessRatio config cannot be used with flinkVersion < 1.11', use " +
				"memoryOffHeapRatio instead")
		}

		// MemoryOffHeapRatio
		err = v.validateRatio(tmSpec.MemoryOffHeapRatio, "taskmanager", "memoryOffHeapRatio")
		if err != nil {
			return err
		}

		// MemoryOffHeapMin
		err = v.validateMemoryOffHeapMin(&tmSpec.MemoryOffHeapMin, tmSpec.GetResources().Memory(), "taskmanager")
		if err != nil {
			return err
		}
	} else {
		if tmSpec.MemoryOffHeapRatio != nil || !tmSpec.MemoryOffHeapMin.IsZero() {
			return fmt.Errorf("MemoryOffHeapRatio or MemoryOffHeapMin config cannot be used with flinkVersion >= 1.11'; " +
				"use memoryProcessRatio istead")
		}
		// MemoryProcessRatio
		err = v.validateRatio(tmSpec.MemoryProcessRatio, "taskmanager", "memoryProcessRatio")
		if err != nil {
			return err
		}
	}

	return nil
}