func newTaskManagerService()

in controllers/flinkcluster/flinkcluster_converter.go [646:687]


func newTaskManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
	var tmSpec = flinkCluster.Spec.TaskManager
	if tmSpec == nil {
		return nil
	}
	var clusterNamespace = flinkCluster.Namespace
	var clusterName = flinkCluster.Name
	// Service name matches the service name defined in the TM StatefulSet spec
	var tmSvcName = getTaskManagerName(clusterName)
	selectorLabels := getComponentLabels(flinkCluster, "taskmanager")
	serviceLabels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))

	var tmSvcPorts = []corev1.ServicePort{
		{
			Name: "data",
			Port: *tmSpec.Ports.Data,
		},
		{
			Name: "rpc",
			Port: *tmSpec.Ports.RPC,
		},
		{
			Name: "query",
			Port: *tmSpec.Ports.Query,
		},
	}

	return &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:       clusterNamespace,
			Name:            tmSvcName,
			OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
			Labels:          serviceLabels,
		},
		Spec: corev1.ServiceSpec{
			Selector:  selectorLabels,
			ClusterIP: corev1.ClusterIPNone,
			Type:      corev1.ServiceTypeClusterIP,
			Ports:     tmSvcPorts,
		},
	}
}