in apis/flinkcluster/v1beta1/flinkcluster_default.go [53:106]
func _SetJobManagerDefault(jmSpec *JobManagerSpec, flinkVersion *version.Version) {
if jmSpec == nil {
return
}
if flinkVersion == nil || flinkVersion.LessThan(v10) {
if jmSpec.MemoryOffHeapMin.Format == "" {
jmSpec.MemoryOffHeapMin = *resource.NewScaledQuantity(600, 6) // 600MB
}
if jmSpec.MemoryOffHeapRatio == nil {
jmSpec.MemoryOffHeapRatio = new(int32)
*jmSpec.MemoryOffHeapRatio = 25
}
} else {
if jmSpec.MemoryProcessRatio == nil {
jmSpec.MemoryProcessRatio = new(int32)
*jmSpec.MemoryProcessRatio = 80
}
}
if jmSpec.Ports.RPC != nil {
var livenessProbe = corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(int(*jmSpec.Ports.RPC)),
},
},
TimeoutSeconds: 10,
InitialDelaySeconds: 5,
PeriodSeconds: 60,
FailureThreshold: 5,
}
if jmSpec.LivenessProbe != nil {
mergo.Merge(&livenessProbe, jmSpec.LivenessProbe, mergo.WithOverride)
}
jmSpec.LivenessProbe = &livenessProbe
var readinessProbe = corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(int(*jmSpec.Ports.RPC)),
},
},
TimeoutSeconds: 10,
InitialDelaySeconds: 5,
PeriodSeconds: 5,
FailureThreshold: 60,
}
if jmSpec.ReadinessProbe != nil {
mergo.Merge(&readinessProbe, jmSpec.ReadinessProbe, mergo.WithOverride)
}
jmSpec.ReadinessProbe = &readinessProbe
}
}