func newConfigMap()

in controllers/flinkcluster/flinkcluster_converter.go [690:756]


func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
	appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion)

	var clusterNamespace = flinkCluster.Namespace
	var clusterName = flinkCluster.Name
	var flinkProperties = flinkCluster.Spec.FlinkProperties
	var jmPorts = flinkCluster.Spec.JobManager.Ports
	var tmPorts = flinkCluster.Spec.TaskManager.Ports
	var configMapName = getConfigMapName(clusterName)
	var labels = mergeLabels(
		getClusterLabels(flinkCluster),
		getRevisionHashLabels(&flinkCluster.Status.Revision))
	// Properties which should be provided from real deployed environment.
	var flinkProps = map[string]string{
		"jobmanager.rpc.address": getJobManagerServiceName(clusterName),
		"jobmanager.rpc.port":    strconv.FormatInt(int64(*jmPorts.RPC), 10),
		"blob.server.port":       strconv.FormatInt(int64(*jmPorts.Blob), 10),
		"query.server.port":      strconv.FormatInt(int64(*jmPorts.Query), 10),
		"rest.port":              strconv.FormatInt(int64(*jmPorts.UI), 10),
		"taskmanager.rpc.port":   strconv.FormatInt(int64(*tmPorts.RPC), 10),
	}

	if appVersion == nil || appVersion.LessThan(v10) {
		var flinkHeapSize = calFlinkHeapSize(flinkCluster)
		if flinkHeapSize["jobmanager.heap.size"] != "" {
			flinkProps["jobmanager.heap.size"] = flinkHeapSize["jobmanager.heap.size"]
		}
		if flinkHeapSize["taskmanager.heap.size"] != "" {
			flinkProps["taskmanager.heap.size"] = flinkHeapSize["taskmanager.heap.size"]
		}
	} else {
		var flinkProcessMemorySize = calFlinkMemoryProcessSize(flinkCluster)
		if flinkProcessMemorySize["jobmanager.memory.process.size"] != "" {
			flinkProps["jobmanager.memory.process.size"] = flinkProcessMemorySize["jobmanager.memory.process.size"]
		}
		if flinkProcessMemorySize["taskmanager.memory.process.size"] != "" {
			flinkProps["taskmanager.memory.process.size"] = flinkProcessMemorySize["taskmanager.memory.process.size"]
		}
	}

	if taskSlots, err := calTaskManagerTaskSlots(flinkCluster); err == nil {
		flinkProps["taskmanager.numberOfTaskSlots"] = strconv.Itoa(int(taskSlots))
	}

	// Add custom Flink properties.
	for k, v := range flinkProperties {
		// Do not allow to override properties from real deployment.
		if _, ok := flinkSysProps[k]; ok {
			continue
		}
		flinkProps[k] = v
	}
	var configData = getLogConf(flinkCluster.Spec)
	configData["flink-conf.yaml"] = getFlinkProperties(flinkProps)
	configData["submit-job.sh"] = submitJobScript
	var configMap = &corev1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:       clusterNamespace,
			Name:            configMapName,
			OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
			Labels:          labels,
		},
		Data: configData,
	}

	return configMap
}