func _SetTaskManagerDefault()

in apis/flinkcluster/v1beta1/flinkcluster_default.go [108:160]


func _SetTaskManagerDefault(tmSpec *TaskManagerSpec, flinkVersion *version.Version) {
	if tmSpec == nil {
		return
	}
	if flinkVersion == nil || flinkVersion.LessThan(v10) {
		if tmSpec.MemoryOffHeapMin.Format == "" {
			tmSpec.MemoryOffHeapMin = *resource.NewScaledQuantity(600, 6) // 600MB
		}
		if tmSpec.MemoryOffHeapRatio == nil {
			tmSpec.MemoryOffHeapRatio = new(int32)
			*tmSpec.MemoryOffHeapRatio = 25
		}
	} else {
		if tmSpec.MemoryProcessRatio == nil {
			tmSpec.MemoryProcessRatio = new(int32)
			*tmSpec.MemoryProcessRatio = 80
		}
	}

	if tmSpec.Ports.RPC != nil {
		var livenessProbe = corev1.Probe{
			ProbeHandler: corev1.ProbeHandler{
				TCPSocket: &corev1.TCPSocketAction{
					Port: intstr.FromInt(int(*tmSpec.Ports.RPC)),
				},
			},
			TimeoutSeconds:      10,
			InitialDelaySeconds: 5,
			PeriodSeconds:       60,
			FailureThreshold:    5,
		}
		if tmSpec.LivenessProbe != nil {
			mergo.Merge(&livenessProbe, tmSpec.LivenessProbe, mergo.WithOverride)
		}
		tmSpec.LivenessProbe = &livenessProbe

		var readinessProbe = corev1.Probe{
			ProbeHandler: corev1.ProbeHandler{
				TCPSocket: &corev1.TCPSocketAction{
					Port: intstr.FromInt(int(*tmSpec.Ports.RPC)),
				},
			},
			TimeoutSeconds:      10,
			InitialDelaySeconds: 5,
			PeriodSeconds:       5,
			FailureThreshold:    60,
		}
		if tmSpec.ReadinessProbe != nil {
			mergo.Merge(&readinessProbe, tmSpec.ReadinessProbe, mergo.WithOverride)
		}
		tmSpec.ReadinessProbe = &readinessProbe
	}
}