func newJobManagerService()

in controllers/flinkcluster/flinkcluster_converter.go [274:341]


func newJobManagerService(flinkCluster *v1beta1.FlinkCluster) *corev1.Service {
	var clusterNamespace = flinkCluster.Namespace
	var clusterName = flinkCluster.Name
	var jobManagerSpec = flinkCluster.Spec.JobManager
	var rpcPort = corev1.ServicePort{
		Name:       "rpc",
		Port:       *jobManagerSpec.Ports.RPC,
		TargetPort: intstr.FromString("rpc")}
	var blobPort = corev1.ServicePort{
		Name:       "blob",
		Port:       *jobManagerSpec.Ports.Blob,
		TargetPort: intstr.FromString("blob")}
	var queryPort = corev1.ServicePort{
		Name:       "query",
		Port:       *jobManagerSpec.Ports.Query,
		TargetPort: intstr.FromString("query")}
	var uiPort = corev1.ServicePort{
		Name:       "ui",
		Port:       *jobManagerSpec.Ports.UI,
		TargetPort: intstr.FromString("ui")}
	var jobManagerServiceName = getJobManagerServiceName(clusterName)
	selectorLabels := getComponentLabels(flinkCluster, "jobmanager")
	serviceLabels := mergeLabels(selectorLabels, getRevisionHashLabels(&flinkCluster.Status.Revision))
	serviceLabels = mergeLabels(serviceLabels, jobManagerSpec.ServiceLabels)
	var serviceAnnotations = jobManagerSpec.ServiceAnnotations

	var jobManagerService = &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Namespace: clusterNamespace,
			Name:      jobManagerServiceName,
			OwnerReferences: []metav1.OwnerReference{
				ToOwnerReference(flinkCluster)},
			Labels:      serviceLabels,
			Annotations: serviceAnnotations,
		},
		Spec: corev1.ServiceSpec{
			Selector: selectorLabels,
			Ports:    []corev1.ServicePort{rpcPort, blobPort, queryPort, uiPort},
		},
	}
	// This implementation is specific to GKE, see details at
	// https://cloud.google.com/kubernetes-engine/docs/how-to/exposing-apps
	// https://cloud.google.com/kubernetes-engine/docs/how-to/internal-load-balancing
	switch jobManagerSpec.AccessScope {
	case v1beta1.AccessScopeCluster:
		jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP
	case v1beta1.AccessScopeVPC:
		jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer
		jobManagerService.Annotations = mergeLabels(serviceAnnotations,
			map[string]string{
				"networking.gke.io/load-balancer-type":                         "Internal",
				"networking.gke.io/internal-load-balancer-allow-global-access": "true",
			})
	case v1beta1.AccessScopeExternal:
		jobManagerService.Spec.Type = corev1.ServiceTypeLoadBalancer
	case v1beta1.AccessScopeNodePort:
		jobManagerService.Spec.Type = corev1.ServiceTypeNodePort
	case v1beta1.AccessScopeHeadless:
		// Headless services do not allocate any sort of VIP or LoadBalancer, and merely
		// collect a set of Pod IPs that are assumed to be independently routable:
		jobManagerService.Spec.Type = corev1.ServiceTypeClusterIP
		jobManagerService.Spec.ClusterIP = "None"
	default:
		panic(fmt.Sprintf(
			"Unknown service access cope: %v", jobManagerSpec.AccessScope))
	}
	return jobManagerService
}