in apis/flinkcluster/v1beta1/flinkcluster_default.go [108:160]
func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Version) {
if tmSpec == nil {
return
}
if flinkVersion == nil || flinkVersion.LessThan(v10) {
if tmSpec.MemoryOffHeapMin.Format == "" {
tmSpec.MemoryOffHeapMin = *resource.NewScaledQuantity(600, 6) // 600MB
}
if tmSpec.MemoryOffHeapRatio == nil {
tmSpec.MemoryOffHeapRatio = new(int32)
*tmSpec.MemoryOffHeapRatio = 25
}
} else {
if tmSpec.MemoryProcessRatio == nil {
tmSpec.MemoryProcessRatio = new(int32)
*tmSpec.MemoryProcessRatio = 80
}
}
if tmSpec.Ports.RPC != nil {
var livenessProbe = corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(int(*tmSpec.Ports.RPC)),
},
},
TimeoutSeconds: 10,
InitialDelaySeconds: 5,
PeriodSeconds: 60,
FailureThreshold: 5,
}
if tmSpec.LivenessProbe != nil {
mergo.Merge(&livenessProbe, tmSpec.LivenessProbe, mergo.WithOverride)
}
tmSpec.LivenessProbe = &livenessProbe
var readinessProbe = corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
TCPSocket: &corev1.TCPSocketAction{
Port: intstr.FromInt(int(*tmSpec.Ports.RPC)),
},
},
TimeoutSeconds: 10,
InitialDelaySeconds: 5,
PeriodSeconds: 5,
FailureThreshold: 60,
}
if tmSpec.ReadinessProbe != nil {
mergo.Merge(&readinessProbe, tmSpec.ReadinessProbe, mergo.WithOverride)
}
tmSpec.ReadinessProbe = &readinessProbe
}
}