func newTaskManagerStatefulSet()

in controllers/flinkcluster/flinkcluster_converter.go [479:520]


func newTaskManagerStatefulSet(flinkCluster *v1beta1.FlinkCluster) *appsv1.StatefulSet {
	var taskManagerSpec = flinkCluster.Spec.TaskManager
	var taskManagerStatefulSetName = getTaskManagerName(flinkCluster.Name)
	var podLabels = getComponentLabels(flinkCluster, "taskmanager")
	podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
	var statefulSetLabels = mergeLabels(podLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

	mainContainer := newTaskManagerContainer(flinkCluster)
	podSpec := newTaskManagerPodSpec(mainContainer, flinkCluster)

	var pvcs []corev1.PersistentVolumeClaim
	if taskManagerSpec.VolumeClaimTemplates != nil {
		pvcs = make([]corev1.PersistentVolumeClaim, len(taskManagerSpec.VolumeClaimTemplates))
		for i, pvc := range taskManagerSpec.VolumeClaimTemplates {
			pvc.OwnerReferences = []metav1.OwnerReference{ToOwnerReference(flinkCluster)}
			pvcs[i] = pvc
		}
	}

	return &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:       flinkCluster.Namespace,
			Name:            taskManagerStatefulSetName,
			OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
			Labels:          statefulSetLabels,
		},
		Spec: appsv1.StatefulSetSpec{
			Replicas:             taskManagerSpec.Replicas,
			Selector:             &metav1.LabelSelector{MatchLabels: podLabels},
			ServiceName:          taskManagerStatefulSetName,
			VolumeClaimTemplates: pvcs,
			PodManagementPolicy:  "Parallel",
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels:      podLabels,
					Annotations: taskManagerSpec.PodAnnotations,
				},
				Spec: *podSpec,
			},
		},
	}
}