in controllers/flinkcluster/flinkcluster_reconciler.go [406:566]
func (reconciler *ClusterReconciler) reconcileJob(ctx context.Context) (ctrl.Result, error) {
log := logr.FromContextOrDiscard(ctx)
var desiredJob = reconciler.desired.Job
var observed = reconciler.observed
var recorded = observed.cluster.Status
var jobSpec = observed.cluster.Spec.Job
var job = recorded.Components.Job
var err error
var jobID = reconciler.getFlinkJobID()
// Update status changed via job reconciliation.
var newSavepointStatus *v1beta1.SavepointStatus
var newControlStatus *v1beta1.FlinkClusterControlStatus
defer reconciler.updateStatus(ctx, &newSavepointStatus, &newControlStatus)
observedSubmitter := observed.flinkJobSubmitter.job
if desiredJob != nil && job.IsTerminated(jobSpec) {
return ctrl.Result{}, nil
}
if wasJobCancelRequested(observed.cluster.Status.Control) {
log.Info("Force tearing down the job")
userControl := getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameJobCancel {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
}
// cancel all running jobs
if job.IsActive() {
if err := reconciler.cancelRunningJobs(ctx, true /* takeSavepoint */); err != nil && !errors.IsResourceExpired(err) {
return requeueResult, err
}
}
// kill job submitter pod
if observedSubmitter != nil {
if err := reconciler.deleteJob(ctx, observedSubmitter); err != nil {
return requeueResult, err
}
}
}
// Create new Flink job submitter when starting new job, updating job or restarting job in failure.
if desiredJob != nil && !job.IsActive() {
log.Info("Deploying Flink job")
// TODO: Record event or introduce Condition in CRD status to notify update state pended.
// https://github.com/kubernetes/apimachinery/blob/57f2a0733447cfd41294477d833cce6580faaca3/pkg/apis/meta/v1/types.go#L1376
var unexpectedJobs = observed.flinkJob.unexpected
if len(unexpectedJobs) > 0 {
// This is an exceptional situation.
// There should be no jobs because all jobs are terminated in the previous iterations.
// In this case user should identify the problem so that the job is not executed multiple times unintentionally
// cause of Flink error, Flink operator error or other unknown error.
// If user want to proceed, unexpected jobs should be terminated.
log.Error(errors.NewInternalError(fmt.Errorf("unexpected jobs found")), "Failed to create job submitter", "unexpected jobs", unexpectedJobs)
return ctrl.Result{}, nil
}
// Create Flink job submitter
log.Info("Updating job status to proceed creating new job submitter")
// Job status must be updated before creating a job submitter to ensure the observed job is the job submitted by the operator.
err = reconciler.updateJobDeployStatus(ctx)
if err != nil {
log.Info("Failed to update the job status for job submission")
return requeueResult, err
}
cr := getCurrentRevisionName(&observed.cluster.Status.Revision)
if observedSubmitter != nil {
if observedSubmitter.Labels[RevisionNameLabel] == cr {
log.Info("Found old job submitter")
err = reconciler.deleteJob(ctx, observedSubmitter)
if err != nil {
return requeueResult, err
}
} else if observedSubmitter.Status.Failed >= 1 {
log.Info("Found failed job submitter")
err = reconciler.deleteJob(ctx, observedSubmitter)
if err != nil {
return requeueResult, err
}
} else {
log.Info("Found job submitter, wait for it to be active or failed")
return requeueResult, nil
}
} else {
err = reconciler.createJob(ctx, desiredJob)
}
return requeueResult, err
}
if desiredJob != nil && job.IsActive() {
if job.State == v1beta1.JobStateDeploying {
log.Info("Job submitter is deployed, wait until completed")
return requeueResult, nil
}
// Suspend or stop job to proceed update.
if recorded.Revision.IsUpdateTriggered() && !isScaleUpdate(observed.revisions, observed.cluster) {
log.Info("Preparing job update")
var takeSavepoint = jobSpec.TakeSavepointOnUpdate == nil || *jobSpec.TakeSavepointOnUpdate
var shouldSuspend = takeSavepoint && util.IsBlank(jobSpec.FromSavepoint)
if shouldSuspend {
newSavepointStatus, err = reconciler.trySuspendJob(ctx)
} else if shouldUpdateJob(&observed) {
err = reconciler.cancelJob(ctx)
}
return requeueResult, err
}
// Trigger savepoint if required.
if len(jobID) > 0 {
var savepointReason = reconciler.shouldTakeSavepoint()
if savepointReason != "" {
newSavepointStatus, err = reconciler.triggerSavepoint(ctx, jobID, savepointReason, false)
}
// Get new control status when the savepoint reason matches the requested control.
var userControl = getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameSavepoint && savepointReason == v1beta1.SavepointReasonUserRequested {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
}
return requeueResult, err
}
log.Info("Job is not finished yet, no action", "jobID", jobID)
return requeueResult, nil
}
// Job finished. Stop Flink job and kill job-submitter.
if desiredJob == nil && (!job.IsStopped() || observedSubmitter != nil) {
if job.IsActive() {
userControl := getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameJobCancel {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
}
log.Info("Stopping job", "jobID", jobID)
if err := reconciler.cancelRunningJobs(ctx, true /* takeSavepoint */); err != nil {
return requeueResult, err
}
} else if job.IsStopped() && observedSubmitter != nil {
if observed.cluster.Status.Components.Job.SubmitterExitCode == -1 {
log.Info("Job submitter has not finished yet")
return requeueResult, err
}
if err := reconciler.deleteJob(ctx, observedSubmitter); err != nil {
return requeueResult, err
}
}
// to make sure the job is stopped
return requeueResult, nil
}
if job.IsStopped() {
log.Info("Job has finished, no action")
}
return ctrl.Result{}, nil
}