in controllers/flinkcluster/flinkcluster_converter.go [274:341]
func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var jobManagerSpec = flinkCluster.Spec.JobManager
var rpcPort = corev1.ServicePort{
Name: "rpc",
Port: *jobManagerSpec.Ports.RPC,
TargetPort: intstr.FromString("rpc")}
var blobPort = corev1.ServicePort{
Name: "blob",
Port: *jobManagerSpec.Ports.Blob,
TargetPort: intstr.FromString("blob")}
var queryPort = corev1.ServicePort{
Name: "query",
Port: *jobManagerSpec.Ports.Query,
TargetPort: intstr.FromString("query")}
var uiPort = corev1.ServicePort{
Name: "ui",
Port: *jobManagerSpec.Ports.UI,
TargetPort: intstr.FromString("ui")}
var jobManagerServiceName = getJobManagerServiceName(clusterName)
selectorLabels := getComponentLabels(flinkCluster, "jobmanager")
serviceLabels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
serviceLabels = mergeLabels(serviceLabels, jobManagerSpec.ServiceLabels)
var serviceAnnotations = jobManagerSpec.ServiceAnnotations
var jobManagerService = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: jobManagerServiceName,
OwnerReferences: []metav1.OwnerReference{
ToOwnerReference(flinkCluster)},
Labels: serviceLabels,
Annotations: serviceAnnotations,
},
Spec: corev1.ServiceSpec{
Selector: selectorLabels,
Ports: []corev1.ServicePort{rpcPort, blobPort, queryPort, uiPort},
},
}
// This implementation is specific to GKE, see details at
// https://cloud.google.com/kubernetes-engine/docs/how-to/exposing-apps
// https://cloud.google.com/kubernetes-engine/docs/how-to/internal-load-balancing
switch jobManagerSpec.AccessScope {
case v1beta1.AccessScopeCluster:
jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP
case v1beta1.AccessScopeVPC:
jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer
jobManagerService.Annotations = mergeLabels(serviceAnnotations,
map[string]string{
"networking.gke.io/load-balancer-type": "Internal",
"networking.gke.io/internal-load-balancer-allow-global-access": "true",
})
case v1beta1.AccessScopeExternal:
jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer
case v1beta1.AccessScopeNodePort:
jobManagerService.Spec.Type = corev1.ServiceTypeNodePort
case v1beta1.AccessScopeHeadless:
// Headless services do not allocate any sort of VIP or LoadBalancer, and merely
// collect a set of Pod IPs that are assumed to be independently routable:
jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP
jobManagerService.Spec.ClusterIP = "None"
default:
panic(fmt.Sprintf(
"Unknown service access cope: %v", jobManagerSpec.AccessScope))
}
return jobManagerService
}