func newJobManagerStatefulSet()

in controllers/flinkcluster/flinkcluster_converter.go [231:271]


func newJobManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
	var jobManagerSpec = flinkCluster.Spec.JobManager
	var jobManagerStatefulSetName = getJobManagerName(flinkCluster.Name)
	var podLabels = getComponentLabels(flinkCluster, "jobmanager")
	podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
	var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

	mainContainer := newJobManagerContainer(flinkCluster)
	podSpec := newJobManagerPodSpec(mainContainer, flinkCluster)

	var pvcs []corev1.PersistentVolumeClaim
	if jobManagerSpec.VolumeClaimTemplates != nil {
		pvcs = make([]corev1.PersistentVolumeClaim, len(jobManagerSpec.VolumeClaimTemplates))
		for i, pvc := range jobManagerSpec.VolumeClaimTemplates {
			pvc.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)}
			pvcs[i] = pvc
		}
	}

	return &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:       flinkCluster.Namespace,
			Name:            jobManagerStatefulSetName,
			OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
			Labels:          statefulSetLabels,
		},
		Spec: appsv1.StatefulSetSpec{
			Replicas:             jobManagerSpec.Replicas,
			Selector:             &metav1.LabelSelector{MatchLabels: podLabels},
			ServiceName:          jobManagerStatefulSetName,
			VolumeClaimTemplates: pvcs,
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels:      podLabels,
					Annotations: jobManagerSpec.PodAnnotations,
				},
				Spec: *podSpec,
			},
		},
	}
}