controllers/flinkcluster/flinkcluster_util.go (473 lines of code) (raw):
/*
Copyright 2019 Google LLC.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flinkcluster
import (
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/spotify/flink-on-k8s-operator/internal/flink"
"github.com/spotify/flink-on-k8s-operator/internal/util"
"sigs.k8s.io/controller-runtime/pkg/client"
v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/controllers/history"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
)
const (
ControlRetries = "retries"
ControlMaxRetries = "3"
RevisionNameLabel = "flinkoperator.k8s.io/revision-name"
JobIdLabel = "flinkoperator.k8s.io/job-id"
SavepointRetryIntervalSeconds = 10
)
var (
jobIdRegexp = regexp.MustCompile("JobID (.*)\n")
)
type UpdateState string
type JobSubmitState string
const (
UpdateStateNoUpdate UpdateState = "NoUpdate"
UpdateStatePreparing UpdateState = "Preparing"
UpdateStateInProgress UpdateState = "InProgress"
UpdateStateFinished UpdateState = "Finished"
JobDeployStateInProgress = "InProgress"
JobDeployStateSucceeded = "Succeeded"
JobDeployStateFailed = "Failed"
JobDeployStateUnknown = "Unknown"
)
type objectForPatch struct {
Metadata objectMetaForPatch `json:"metadata"`
}
// objectMetaForPatch define object meta struct for patch operation
type objectMetaForPatch struct {
Annotations map[string]interface{} `json:"annotations"`
}
func getFlinkAPIBaseURL(cluster *v1beta1.FlinkCluster) string {
clusterDomain := os.Getenv("CLUSTER_DOMAIN")
if clusterDomain == "" {
clusterDomain = "cluster.local"
}
return fmt.Sprintf(
"http://%s.%s.svc.%s:%d",
getJobManagerServiceName(cluster.Name),
cluster.Namespace,
clusterDomain,
*cluster.Spec.JobManager.Ports.UI)
}
// Gets ConfigMap name
func getConfigMapName(clusterName string) string {
return clusterName + "-configmap"
}
// Gets PodDisruptionBudgetName name
func getPodDisruptionBudgetName(clusterName string) string {
return "flink-" + clusterName
}
// Get HorizontalPodAutoscaler name
func getHorizontalPodAutoscalerName(clusterName string) string {
return "flink-" + clusterName
}
// Gets JobManager StatefulSet name
func getJobManagerName(clusterName string) string {
return clusterName + "-jobmanager"
}
// Gets JobManager service name
func getJobManagerServiceName(clusterName string) string {
return clusterName + "-jobmanager"
}
// Gets JobManager ingress name
func getJobManagerIngressName(clusterName string) string {
return clusterName + "-jobmanager"
}
// Gets TaskManager StatefulSet name
func getTaskManagerName(clusterName string) string {
return clusterName + "-taskmanager"
}
func getJobManagerJobName(clusterName string) string {
return getJobManagerName(clusterName)
}
func getSubmitterJobName(clusterName string) string {
return clusterName + "-job-submitter"
}
// Checks whether it is possible to take savepoint.
func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool {
var jobSpec = cluster.Spec.Job
var savepointStatus = cluster.Status.Savepoint
var job = cluster.Status.Components.Job
return jobSpec != nil && jobSpec.SavepointsDir != nil &&
!job.IsStopped() &&
(savepointStatus == nil || savepointStatus.State != v1beta1.SavepointStateInProgress)
}
// Checks if the job should be stopped because a job-cancel was requested
func shouldStopJob(cluster *v1beta1.FlinkCluster) bool {
var userControl = cluster.Annotations[v1beta1.ControlAnnotation]
var cancelRequested = cluster.Spec.Job.CancelRequested
return userControl == v1beta1.ControlNameJobCancel ||
(cancelRequested != nil && *cancelRequested)
}
func getFromSavepoint(jobSpec batchv1.JobSpec) string {
var jobArgs = jobSpec.Template.Spec.Containers[0].Args
for i, arg := range jobArgs {
if arg == "--fromSavepoint" && i < len(jobArgs)-1 {
return jobArgs[i+1]
}
}
return ""
}
// newRevision generates FlinkClusterSpec patch and makes new child ControllerRevision resource with it.
func newRevision(cluster *v1beta1.FlinkCluster, revision int64, collisionCount *int32) (*appsv1.ControllerRevision, error) {
patch, err := newRevisionDataPatch(cluster)
if err != nil {
return nil, err
}
cr, err := history.NewControllerRevision(cluster,
controllerKind,
cluster.Labels,
runtime.RawExtension{Raw: patch},
revision,
collisionCount)
if err != nil {
return nil, err
}
if cr.Annotations == nil {
cr.Annotations = make(map[string]string)
}
for key, value := range cluster.Annotations {
cr.Annotations[key] = value
}
cr.SetNamespace(cluster.GetNamespace())
cr.GetLabels()[history.ControllerRevisionManagedByLabel] = cluster.GetName()
return cr, nil
}
func newRevisionDataPatch(cluster *v1beta1.FlinkCluster) ([]byte, error) {
// Ignore fields not related to rendering job resource.
var c *v1beta1.FlinkCluster
if cluster.Spec.Job != nil {
c = cluster.DeepCopy()
c.Spec.Job.CleanupPolicy = nil
c.Spec.Job.RestartPolicy = nil
c.Spec.Job.CancelRequested = nil
c.Spec.Job.SavepointGeneration = 0
} else {
c = cluster
}
str := &bytes.Buffer{}
err := unstructured.UnstructuredJSONScheme.Encode(c, str)
if err != nil {
return nil, err
}
var raw map[string]interface{}
json.Unmarshal([]byte(str.Bytes()), &raw)
objCopy := make(map[string]interface{})
spec := raw["spec"].(map[string]interface{})
objCopy["spec"] = spec
spec["$patch"] = "replace"
// backward compatibility fix
if c.Spec.Job != nil {
job := spec["job"].(map[string]interface{})
job["restartPolicy"] = nil
}
patch, err := json.Marshal(objCopy)
return patch, err
}
func getCurrentRevisionName(r *v1beta1.RevisionStatus) string {
return r.CurrentRevision[:strings.LastIndex(r.CurrentRevision, "-")]
}
func getNextRevisionName(r *v1beta1.RevisionStatus) string {
return r.NextRevision[:strings.LastIndex(r.NextRevision, "-")]
}
func getRetryCount(data map[string]string) (string, error) {
var err error
var retries, ok = data["retries"]
if ok {
retryCount, err := strconv.Atoi(retries)
if err == nil {
retryCount++
retries = strconv.Itoa(retryCount)
}
} else {
retries = "1"
}
return retries, err
}
// getNewControlRequest returns new requested control that is not in progress now.
func getNewControlRequest(cluster *v1beta1.FlinkCluster) string {
var userControl = cluster.Annotations[v1beta1.ControlAnnotation]
var recorded = cluster.Status
if recorded.Control == nil || recorded.Control.State != v1beta1.ControlStateInProgress {
return userControl
}
return ""
}
func getControlStatus(controlName string, state string) *v1beta1.FlinkClusterControlStatus {
var controlStatus = new(v1beta1.FlinkClusterControlStatus)
controlStatus.Name = controlName
controlStatus.State = state
util.SetTimestamp(&controlStatus.UpdateTime)
return controlStatus
}
func controlStatusChanged(cluster *v1beta1.FlinkCluster, controlName string) bool {
if controlName == "" {
return false
}
var recorded = cluster.Status
if recorded.Control == nil || recorded.Control.Name != controlName {
return true
}
return false
}
func getControlEvent(status v1beta1.FlinkClusterControlStatus) (eventType string, eventReason string, eventMessage string) {
var msg = status.Message
if len(msg) > 100 {
msg = msg[:100] + "..."
}
switch status.State {
case v1beta1.ControlStateRequested:
eventType = corev1.EventTypeNormal
eventReason = "ControlRequested"
eventMessage = fmt.Sprintf("Requested new user control %v", status.Name)
case v1beta1.ControlStateInProgress:
eventType = corev1.EventTypeNormal
eventReason = "ControlInProgress"
eventMessage = fmt.Sprintf("In progress user control %v", status.Name)
case v1beta1.ControlStateSucceeded:
eventType = corev1.EventTypeNormal
eventReason = "ControlSucceeded"
eventMessage = fmt.Sprintf("Succesfully completed user control %v", status.Name)
case v1beta1.ControlStateFailed:
eventType = corev1.EventTypeWarning
eventReason = "ControlFailed"
if status.Message != "" {
eventMessage = fmt.Sprintf("User control %v failed: %v", status.Name, msg)
} else {
eventMessage = fmt.Sprintf("User control %v failed", status.Name)
}
}
return
}
func getSavepointEvent(status v1beta1.SavepointStatus) (eventType string, eventReason string, eventMessage string) {
var msg = status.Message
if len(msg) > 100 {
msg = msg[:100] + "..."
}
var triggerReason = status.TriggerReason
if triggerReason == v1beta1.SavepointReasonJobCancel || triggerReason == v1beta1.SavepointReasonUpdate {
triggerReason = "for " + triggerReason
}
switch status.State {
case v1beta1.SavepointStateTriggerFailed:
eventType = corev1.EventTypeWarning
eventReason = "SavepointFailed"
eventMessage = fmt.Sprintf("Failed to trigger savepoint %v: %v", triggerReason, msg)
case v1beta1.SavepointStateInProgress:
eventType = corev1.EventTypeNormal
eventReason = "SavepointTriggered"
eventMessage = fmt.Sprintf("Triggered savepoint %v: triggerID %v.", triggerReason, status.TriggerID)
case v1beta1.SavepointStateSucceeded:
eventType = corev1.EventTypeNormal
eventReason = "SavepointCreated"
eventMessage = "Successfully savepoint created"
case v1beta1.SavepointStateFailed:
eventType = corev1.EventTypeWarning
eventReason = "SavepointFailed"
eventMessage = fmt.Sprintf("Savepoint creation failed: %v", msg)
}
return
}
func isUserControlFinished(controlStatus *v1beta1.FlinkClusterControlStatus) bool {
return controlStatus.State == v1beta1.ControlStateSucceeded ||
controlStatus.State == v1beta1.ControlStateFailed
}
// Check time has passed
func hasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool {
tc := &util.TimeConverter{}
timeToCheck := tc.FromString(timeToCheckStr)
intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second)))
return now.After(intervalPassedTime)
}
// isComponentUpdated checks whether the component updated.
// If the component is observed as well as the next revision name in status.nextRevision and component's label `flinkoperator.k8s.io/hash` are equal, then it is updated already.
// If the component is not observed and it is required, then it is not updated yet.
// If the component is not observed and it is optional, but it is specified in the spec, then it is not updated yet.
func isComponentUpdated(component client.Object, cluster *v1beta1.FlinkCluster) bool {
if !cluster.Status.Revision.IsUpdateTriggered() {
return true
}
switch o := component.(type) {
case *appsv1.Deployment:
if o == nil {
return false
}
case *appsv1.StatefulSet:
if o == nil {
return false
}
case *corev1.ConfigMap:
if o == nil {
return false
}
case *policyv1.PodDisruptionBudget:
if o == nil {
return false
}
case *corev1.Service:
if o == nil {
return false
}
case *batchv1.Job:
if o == nil {
return cluster.Spec.Job == nil
}
case *networkingv1.Ingress:
if o == nil {
jm := cluster.Spec.JobManager
return jm == nil || jm.Ingress == nil
}
case *autoscalingv2.HorizontalPodAutoscaler:
if o == nil {
return false
}
}
labels := component.GetLabels()
nextRevisionName := getNextRevisionName(&cluster.Status.Revision)
return labels[RevisionNameLabel] == nextRevisionName
}
func areComponentsUpdated(components []client.Object, cluster *v1beta1.FlinkCluster) bool {
for _, c := range components {
if !isComponentUpdated(c, cluster) {
return false
}
}
return true
}
// isClusterUpdateToDate checks whether all cluster components are replaced to next revision.
func isClusterUpdateToDate(observed *ObservedClusterState) bool {
if !observed.cluster.Status.Revision.IsUpdateTriggered() {
return true
}
components := []client.Object{
observed.configMap,
observed.tmService,
observed.jmService,
}
if !IsApplicationModeCluster(observed.cluster) {
components = append(components, observed.jmStatefulSet)
}
if observed.cluster.Spec.PodDisruptionBudget != nil {
components = append(components, observed.podDisruptionBudget)
}
if observed.cluster.Spec.TaskManager.HorizontalPodAutoscaler != nil {
components = append(components, observed.horizontalPodAutoscaler)
}
switch observed.cluster.Spec.TaskManager.DeploymentType {
case v1beta1.DeploymentTypeDeployment:
components = append(components, observed.tmDeployment)
case v1beta1.DeploymentTypeStatefulSet:
components = append(components, observed.tmStatefulSet)
}
return areComponentsUpdated(components, observed.cluster)
}
// isFlinkAPIReady checks whether cluster is ready to submit job.
func isFlinkAPIReady(list *flink.JobsOverview) bool {
// If the observed Flink job status list is not nil (e.g., emtpy list),
// it means Flink REST API server is up and running. It is the source of
// truth of whether we can submit a job.
return list != nil
}
// jobStateFinalized returns true, if job state is saved so that it can be resumed later.
func finalSavepointRequested(jobID string, s *v1beta1.SavepointStatus) bool {
return s != nil && s.JobID == jobID &&
(s.TriggerReason == v1beta1.SavepointReasonUpdate ||
s.TriggerReason == v1beta1.SavepointReasonJobCancel)
}
func getUpdateState(observed *ObservedClusterState) UpdateState {
if observed.cluster == nil {
return UpdateStateNoUpdate
}
clusterStatus := observed.cluster.Status
if !clusterStatus.Revision.IsUpdateTriggered() {
return UpdateStateNoUpdate
}
jobStatus := clusterStatus.Components.Job
switch {
case !isScaleUpdate(observed.revisions, observed.cluster) &&
!jobStatus.UpdateReady(observed.cluster.Spec.Job, observed.observeTime):
return UpdateStatePreparing
case !isClusterUpdateToDate(observed):
return UpdateStateInProgress
}
return UpdateStateFinished
}
func revisionDiff(revisions []*appsv1.ControllerRevision) map[string]util.DiffValue {
if len(revisions) < 2 {
return map[string]util.DiffValue{}
}
patchSpec := func(bytes []byte) map[string]any {
var raw map[string]any
json.Unmarshal(bytes, &raw)
return raw["spec"].(map[string]any)
}
history.SortControllerRevisions(revisions)
a, b := revisions[len(revisions)-2], revisions[len(revisions)-1]
aSpec := patchSpec(a.Data.Raw)
bSpec := patchSpec(b.Data.Raw)
return util.MapDiff(aSpec, bSpec)
}
func isScaleUpdate(revisions []*appsv1.ControllerRevision, cluster *v1beta1.FlinkCluster) bool {
if cluster != nil && cluster.Spec.Job == nil {
return false
}
diff := revisionDiff(revisions)
tmDiff, ok := diff["taskManager"]
if len(diff) != 1 || !ok {
return false
}
left := tmDiff.Left.(map[string]any)["replicas"]
right := tmDiff.Right.(map[string]any)["replicas"]
return left != right
}
func shouldUpdateJob(observed *ObservedClusterState) bool {
return observed.updateState == UpdateStateInProgress && !isScaleUpdate(observed.revisions, observed.cluster)
}
func shouldUpdateCluster(observed *ObservedClusterState) bool {
var job = observed.cluster.Status.Components.Job
return !job.IsActive() && observed.updateState == UpdateStateInProgress
}
func shouldRecreateOnUpdate(observed *ObservedClusterState) bool {
ru := observed.cluster.Spec.RecreateOnUpdate
return *ru && !isScaleUpdate(observed.revisions, observed.cluster)
}
func getFlinkJobDeploymentState(flinkJobState string) v1beta1.JobState {
switch flinkJobState {
case "INITIALIZING", "CREATED", "RUNNING", "FAILING", "CANCELLING", "RESTARTING", "RECONCILING", "SUSPENDED":
return v1beta1.JobStateRunning
case "FINISHED":
return v1beta1.JobStateSucceeded
case "CANCELED":
return v1beta1.JobStateCancelled
case "FAILED":
return v1beta1.JobStateFailed
default:
return v1beta1.JobStateUnknown
}
}
// getFlinkJobSubmitLog extract logs from the job submitter pod.
func getFlinkJobSubmitLog(clientset *kubernetes.Clientset, observedPod *corev1.Pod) (*SubmitterLog, error) {
log, err := util.GetPodLogs(clientset, observedPod)
if err != nil {
return nil, err
}
return getFlinkJobSubmitLogFromString(log), nil
}
func getFlinkJobSubmitLogFromString(podLog string) *SubmitterLog {
if result := jobIdRegexp.FindStringSubmatch(podLog); len(result) > 0 {
return &SubmitterLog{jobID: result[1], message: podLog}
} else {
return &SubmitterLog{jobID: "", message: podLog}
}
}
func IsApplicationModeCluster(cluster *v1beta1.FlinkCluster) bool {
jobSpec := cluster.Spec.Job
return jobSpec != nil && jobSpec.Mode != nil && *jobSpec.Mode == v1beta1.JobModeApplication
}
// checks if job-cancel was requested
func wasJobCancelRequested(controlStatus *v1beta1.FlinkClusterControlStatus) bool {
return controlStatus != nil && controlStatus.Name == v1beta1.ControlNameJobCancel
}
func GenJobId(cluster *v1beta1.FlinkCluster) (string, error) {
if cluster != nil && cluster.Status.Components.Job != nil && cluster.Status.Components.Job.ID != "" {
return cluster.Status.Components.Job.ID, nil
}
if cluster == nil || len(cluster.Status.Revision.NextRevision) == 0 {
return "", fmt.Errorf("error generating job id: cluster or next revision is nil")
}
hash := md5.Sum([]byte(cluster.Status.Revision.NextRevision))
return hex.EncodeToString(hash[:]), nil
}
func isJobInitialising(jobStatus batchv1.JobStatus) bool {
return jobStatus.Active == 0 && jobStatus.Succeeded == 0 && jobStatus.Failed == 0
}