in apis/flinkcluster/v1beta1/flinkcluster_validate.go [329:390]
func (v *Validator) validateJobManager(flinkVersion *version.Version, jmSpec *JobManagerSpec) error {
var err error
if jmSpec == nil {
return nil
}
fp := field.NewPath("spec.jobManager")
if errors := validation.ValidateAnnotations(jmSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
if errors := v1validation.ValidateLabels(jmSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
// Ports.
var ports = []NamedPort{
{Name: "rpc", ContainerPort: *jmSpec.Ports.RPC},
{Name: "blob", ContainerPort: *jmSpec.Ports.Blob},
{Name: "query", ContainerPort: *jmSpec.Ports.Query},
{Name: "ui", ContainerPort: *jmSpec.Ports.UI},
}
ports = append(ports, jmSpec.ExtraPorts...)
err = v.checkDupPorts(ports, "jobmanager")
if err != nil {
return err
}
if err := v.validateResourceRequirements(jmSpec.Resources, "jobmanager"); err != nil {
return err
}
if flinkVersion == nil || flinkVersion.LessThan(v10) {
if jmSpec.MemoryProcessRatio != nil {
return fmt.Errorf("MemoryProcessRatio config cannot be used with flinkVersion < 1.11', use " +
"memoryOffHeapRatio instead")
}
// MemoryOffHeapRatio
err = v.validateRatio(jmSpec.MemoryOffHeapRatio, "jobmanager", "memoryOffHeapRatio")
if err != nil {
return err
}
// MemoryOffHeapMin
err = v.validateMemoryOffHeapMin(&jmSpec.MemoryOffHeapMin, jmSpec.GetResources().Memory(), "jobmanager")
if err != nil {
return err
}
} else {
if jmSpec.MemoryOffHeapRatio != nil || !jmSpec.MemoryOffHeapMin.IsZero() {
return fmt.Errorf("MemoryOffHeapRatio or MemoryOffHeapMin config cannot be used with flinkVersion >= 1.11'; " +
"use memoryProcessRatio istead")
}
// MemoryProcessRatio
err = v.validateRatio(jmSpec.MemoryProcessRatio, "jobmanager", "memoryProcessRatio")
if err != nil {
return err
}
}
return nil
}