in controllers/flinkcluster/flinkcluster_converter.go [479:520]
func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
var taskManagerSpec = flinkCluster.Spec.TaskManager
var taskManagerStatefulSetName = getTaskManagerName(flinkCluster.Name)
var podLabels = getComponentLabels(flinkCluster, "taskmanager")
podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
mainContainer := newTaskManagerContainer(flinkCluster)
podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster)
var pvcs []corev1.PersistentVolumeClaim
if taskManagerSpec.VolumeClaimTemplates != nil {
pvcs = make([]corev1.PersistentVolumeClaim, len(taskManagerSpec.VolumeClaimTemplates))
for i, pvc := range taskManagerSpec.VolumeClaimTemplates {
pvc.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)}
pvcs[i] = pvc
}
}
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: flinkCluster.Namespace,
Name: taskManagerStatefulSetName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: statefulSetLabels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: taskManagerSpec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels},
ServiceName: taskManagerStatefulSetName,
VolumeClaimTemplates: pvcs,
PodManagementPolicy: "Parallel",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: taskManagerSpec.PodAnnotations,
},
Spec: *podSpec,
},
},
}
}