in apis/flinkcluster/v1beta1/flinkcluster_validate.go [392:452]
func (v *Validator) validateTaskManager(flinkVersion *version.Version, tmSpec *TaskManagerSpec) error {
if tmSpec == nil {
return nil
}
fp := field.NewPath("spec.taskManager")
if errors := validation.ValidateAnnotations(tmSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
if errors := v1validation.ValidateLabels(tmSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
// Ports.
var err error
var ports = []NamedPort{
{Name: "rpc", ContainerPort: *tmSpec.Ports.RPC},
{Name: "data", ContainerPort: *tmSpec.Ports.Data},
{Name: "query", ContainerPort: *tmSpec.Ports.Query},
}
ports = append(ports, tmSpec.ExtraPorts...)
err = v.checkDupPorts(ports, "taskmanager")
if err != nil {
return err
}
if err := v.validateResourceRequirements(tmSpec.Resources, "taskmanager"); err != nil {
return err
}
if flinkVersion == nil || flinkVersion.LessThan(v10) {
if tmSpec.MemoryProcessRatio != nil {
return fmt.Errorf("MemoryProcessRatio config cannot be used with flinkVersion < 1.11', use " +
"memoryOffHeapRatio instead")
}
// MemoryOffHeapRatio
err = v.validateRatio(tmSpec.MemoryOffHeapRatio, "taskmanager", "memoryOffHeapRatio")
if err != nil {
return err
}
// MemoryOffHeapMin
err = v.validateMemoryOffHeapMin(&tmSpec.MemoryOffHeapMin, tmSpec.GetResources().Memory(), "taskmanager")
if err != nil {
return err
}
} else {
if tmSpec.MemoryOffHeapRatio != nil || !tmSpec.MemoryOffHeapMin.IsZero() {
return fmt.Errorf("MemoryOffHeapRatio or MemoryOffHeapMin config cannot be used with flinkVersion >= 1.11'; " +
"use memoryProcessRatio istead")
}
// MemoryProcessRatio
err = v.validateRatio(tmSpec.MemoryProcessRatio, "taskmanager", "memoryProcessRatio")
if err != nil {
return err
}
}
return nil
}