in pkg/flink/resources.go [178:215]
func (fc *FlinkCluster) updateTaskManagerSpec(taskCtx FlinkTaskContext) {
out := fc.Spec.TaskManager
if out == nil {
out = &flinkOp.TaskManagerSpec{}
}
out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations)
out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels)
tm := taskCtx.Job.TaskManager
if cpu := tm.GetResource().GetCpu(); cpu != nil {
if quantity := resource.MustParse(cpu.GetString_()); !quantity.IsZero() {
out.Resources.Requests[corev1.ResourceCPU] = quantity
}
}
if memory := tm.GetResource().GetMemory(); memory != nil {
if quantity := resource.MustParse(memory.GetString_()); !quantity.IsZero() {
out.Resources.Limits[corev1.ResourceMemory] = quantity
}
}
if replicas := tm.GetReplicas(); replicas > 0 {
out.Replicas = &replicas
}
if pv := tm.GetResource().GetPersistentVolume(); pv != nil {
claim := getPersistentVolumeClaim(taskManagerVolumeClaim, pv)
out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim(
out.VolumeClaimTemplates,
out.VolumeMounts,
claim,
volumeClaimMountPath,
)
fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath
}
}