in controllers/flinkcluster/flinkcluster_converter.go [231:271]
func newJobManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
var jobManagerSpec = flinkCluster.Spec.JobManager
var jobManagerStatefulSetName = getJobManagerName(flinkCluster.Name)
var podLabels = getComponentLabels(flinkCluster, "jobmanager")
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
mainContainer := newJobManagerContainer(flinkCluster)
podSpec := newJobManagerPodSpec(mainContainer, flinkCluster)
var pvcs []corev1.PersistentVolumeClaim
if jobManagerSpec.VolumeClaimTemplates != nil {
pvcs = make([]corev1.PersistentVolumeClaim, len(jobManagerSpec.VolumeClaimTemplates))
for i, pvc := range jobManagerSpec.VolumeClaimTemplates {
pvc.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)}
pvcs[i] = pvc
}
}
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: flinkCluster.Namespace,
Name: jobManagerStatefulSetName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: statefulSetLabels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: jobManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: jobManagerStatefulSetName,
VolumeClaimTemplates: pvcs,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: jobManagerSpec.PodAnnotations,
},
Spec: *podSpec,
},
},
}
}