in controllers/flinkcluster/flinkcluster_converter.go [646:687]
func newTaskManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
var tmSpec = flinkCluster.Spec.TaskManager
if tmSpec == nil {
return nil
}
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
// Service name matches the service name defined in the TM StatefulSet spec
var tmSvcName = getTaskManagerName(clusterName)
selectorLabels := getComponentLabels(flinkCluster, "taskmanager")
serviceLabels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
var tmSvcPorts = []corev1.ServicePort{
{
Name: "data",
Port: *tmSpec.Ports.Data,
},
{
Name: "rpc",
Port: *tmSpec.Ports.RPC,
},
{
Name: "query",
Port: *tmSpec.Ports.Query,
},
}
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: tmSvcName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: serviceLabels,
},
Spec: corev1.ServiceSpec{
Selector: selectorLabels,
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
Ports: tmSvcPorts,
},
}
}