in controllers/flinkcluster/flinkcluster_reconciler.go [799:830]
func (reconciler *ClusterReconciler) triggerSavepoint(
ctx context.Context,
jobID string,
triggerReason v1beta1.SavepointReason,
cancel bool) (*v1beta1.SavepointStatus, error) {
log := logr.FromContextOrDiscard(ctx)
var cluster = reconciler.observed.cluster
var apiBaseURL = getFlinkAPIBaseURL(reconciler.observed.cluster)
var triggerSuccess bool
var savepointTriggerID *flink.SavepointTriggerID
var triggerID string
var message string
var err error
log.Info(fmt.Sprintf("Trigger savepoint for %s", triggerReason), "jobID", jobID)
savepointTriggerID, err = reconciler.flinkClient.TriggerSavepoint(apiBaseURL, jobID, *cluster.Spec.Job.SavepointsDir, cancel)
if err != nil {
// limit message size to 1KiB
if message = err.Error(); len(message) > 1024 {
message = message[:1024] + "..."
}
triggerSuccess = false
log.Info("Failed to trigger savepoint", "jobID", jobID, "triggerID", triggerID, "error", err)
} else {
triggerSuccess = true
triggerID = savepointTriggerID.RequestID
log.Info("Successfully savepoint triggered", "jobID", jobID, "triggerID", triggerID)
}
newSavepointStatus := reconciler.getNewSavepointStatus(triggerID, triggerReason, message, triggerSuccess)
return newSavepointStatus, err
}