func()

in pkg/flink/resources.go [143:176]


func (fc *FlinkCluster) updateJobManagerSpec(taskCtx FlinkTaskContext) {
	out := fc.Spec.JobManager
	if out == nil {
		out = &flinkOp.JobManagerSpec{}
	}

	out.PodAnnotations = utils.UnionMaps(taskCtx.Annotations, out.PodAnnotations)
	out.PodLabels = utils.UnionMaps(taskCtx.Labels, out.PodLabels)

	jm := taskCtx.Job.JobManager

	if cpu := jm.GetResource().GetCpu(); cpu != nil {
		if quantity := resource.MustParse(cpu.GetString_()); !quantity.IsZero() {
			out.Resources.Requests[corev1.ResourceCPU] = quantity
		}
	}

	if memory := jm.GetResource().GetMemory(); memory != nil {
		if quantity := resource.MustParse(memory.GetString_()); !quantity.IsZero() {
			out.Resources.Limits[corev1.ResourceMemory] = quantity
		}
	}

	if pv := jm.GetResource().GetPersistentVolume(); pv != nil {
		claim := getPersistentVolumeClaim(jobManagerVolumeClaim, pv)
		out.VolumeClaimTemplates, out.VolumeMounts = addPersistentVolumeClaim(
			out.VolumeClaimTemplates,
			out.VolumeMounts,
			claim,
			volumeClaimMountPath,
		)
		fc.Spec.FlinkProperties[flinkIoTmpDirsProperty] = volumeClaimMountPath
	}
}