in controllers/flinkcluster/flinkcluster_converter.go [690:756]
func newConfigMap(flinkCluster *v1beta1.FlinkCluster) *corev1.ConfigMap {
appVersion, _ := version.NewVersion(flinkCluster.Spec.FlinkVersion)
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var flinkProperties = flinkCluster.Spec.FlinkProperties
var jmPorts = flinkCluster.Spec.JobManager.Ports
var tmPorts = flinkCluster.Spec.TaskManager.Ports
var configMapName = getConfigMapName(clusterName)
var labels = mergeLabels(
getClusterLabels(flinkCluster),
getRevisionHashLabels(&flinkCluster.Status.Revision))
// Properties which should be provided from real deployed environment.
var flinkProps = map[string]string{
"jobmanager.rpc.address": getJobManagerServiceName(clusterName),
"jobmanager.rpc.port": strconv.FormatInt(int64(*jmPorts.RPC), 10),
"blob.server.port": strconv.FormatInt(int64(*jmPorts.Blob), 10),
"query.server.port": strconv.FormatInt(int64(*jmPorts.Query), 10),
"rest.port": strconv.FormatInt(int64(*jmPorts.UI), 10),
"taskmanager.rpc.port": strconv.FormatInt(int64(*tmPorts.RPC), 10),
}
if appVersion == nil || appVersion.LessThan(v10) {
var flinkHeapSize = calFlinkHeapSize(flinkCluster)
if flinkHeapSize["jobmanager.heap.size"] != "" {
flinkProps["jobmanager.heap.size"] = flinkHeapSize["jobmanager.heap.size"]
}
if flinkHeapSize["taskmanager.heap.size"] != "" {
flinkProps["taskmanager.heap.size"] = flinkHeapSize["taskmanager.heap.size"]
}
} else {
var flinkProcessMemorySize = calFlinkMemoryProcessSize(flinkCluster)
if flinkProcessMemorySize["jobmanager.memory.process.size"] != "" {
flinkProps["jobmanager.memory.process.size"] = flinkProcessMemorySize["jobmanager.memory.process.size"]
}
if flinkProcessMemorySize["taskmanager.memory.process.size"] != "" {
flinkProps["taskmanager.memory.process.size"] = flinkProcessMemorySize["taskmanager.memory.process.size"]
}
}
if taskSlots, err := calTaskManagerTaskSlots(flinkCluster); err == nil {
flinkProps["taskmanager.numberOfTaskSlots"] = strconv.Itoa(int(taskSlots))
}
// Add custom Flink properties.
for k, v := range flinkProperties {
// Do not allow to override properties from real deployment.
if _, ok := flinkSysProps[k]; ok {
continue
}
flinkProps[k] = v
}
var configData = getLogConf(flinkCluster.Spec)
configData["flink-conf.yaml"] = getFlinkProperties(flinkProps)
configData["submit-job.sh"] = submitJobScript
var configMap = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: configMapName,
OwnerReferences: []metav1.OwnerReference{ToOwnerReference(flinkCluster)},
Labels: labels,
},
Data: configData,
}
return configMap
}