in pkg/flink/resources.go [143:176]
func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) {
out := fc.Spec.JobManager
if out == nil {
out = &flinkOp.JobManagerSpec{}
}
out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations)
out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels)
jm := taskCtx.Job.JobManager
if cpu := jm.GetResource().GetCpu(); cpu != nil {
if quantity := resource.MustParse(cpu.GetString_()); !quantity.IsZero() {
out.Resources.Requests[corev1.ResourceCPU] = quantity
}
}
if memory := jm.GetResource().GetMemory(); memory != nil {
if quantity := resource.MustParse(memory.GetString_()); !quantity.IsZero() {
out.Resources.Limits[corev1.ResourceMemory] = quantity
}
}
if pv := jm.GetResource().GetPersistentVolume(); pv != nil {
claim := getPersistentVolumeClaim(jobManagerVolumeClaim, pv)
out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim(
out.VolumeClaimTemplates,
out.VolumeMounts,
claim,
volumeClaimMountPath,
)
fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath
}
}