func _SetJobManagerDefault()

in apis/flinkcluster/v1beta1/flinkcluster_default.go [53:106]


func _SetJobManagerDefault(jmSpec *JobManagerSpec, flinkVersion *version.Version) {
	if jmSpec == nil {
		return
	}

	if flinkVersion == nil || flinkVersion.LessThan(v10) {
		if jmSpec.MemoryOffHeapMin.Format == "" {
			jmSpec.MemoryOffHeapMin = *resource.NewScaledQuantity(600, 6) // 600MB
		}
		if jmSpec.MemoryOffHeapRatio == nil {
			jmSpec.MemoryOffHeapRatio = new(int32)
			*jmSpec.MemoryOffHeapRatio = 25
		}
	} else {
		if jmSpec.MemoryProcessRatio == nil {
			jmSpec.MemoryProcessRatio = new(int32)
			*jmSpec.MemoryProcessRatio = 80
		}
	}

	if jmSpec.Ports.RPC != nil {
		var livenessProbe = corev1.Probe{
			ProbeHandler: corev1.ProbeHandler{
				TCPSocket: &corev1.TCPSocketAction{
					Port: intstr.FromInt(int(*jmSpec.Ports.RPC)),
				},
			},
			TimeoutSeconds:      10,
			InitialDelaySeconds: 5,
			PeriodSeconds:       60,
			FailureThreshold:    5,
		}
		if jmSpec.LivenessProbe != nil {
			mergo.Merge(&livenessProbe, jmSpec.LivenessProbe, mergo.WithOverride)
		}
		jmSpec.LivenessProbe = &livenessProbe

		var readinessProbe = corev1.Probe{
			ProbeHandler: corev1.ProbeHandler{
				TCPSocket: &corev1.TCPSocketAction{
					Port: intstr.FromInt(int(*jmSpec.Ports.RPC)),
				},
			},
			TimeoutSeconds:      10,
			InitialDelaySeconds: 5,
			PeriodSeconds:       5,
			FailureThreshold:    60,
		}
		if jmSpec.ReadinessProbe != nil {
			mergo.Merge(&readinessProbe, jmSpec.ReadinessProbe, mergo.WithOverride)
		}
		jmSpec.ReadinessProbe = &readinessProbe
	}
}