func()

in controllers/flinkcluster/flinkcluster_reconciler.go [799:830]


func (reconciler *ClusterReconciler) triggerSavepoint(
	ctx context.Context,
	jobID string,
	triggerReason v1beta1.SavepointReason,
	cancel bool) (*v1beta1.SavepointStatus, error) {
	log := logr.FromContextOrDiscard(ctx)
	var cluster = reconciler.observed.cluster
	var apiBaseURL = getFlinkAPIBaseURL(reconciler.observed.cluster)
	var triggerSuccess bool
	var savepointTriggerID *flink.SavepointTriggerID
	var triggerID string
	var message string
	var err error

	log.Info(fmt.Sprintf("Trigger savepoint for %s", triggerReason), "jobID", jobID)
	savepointTriggerID, err = reconciler.flinkClient.TriggerSavepoint(apiBaseURL, jobID, *cluster.Spec.Job.SavepointsDir, cancel)
	if err != nil {
		// limit message size to 1KiB
		if message = err.Error(); len(message) > 1024 {
			message = message[:1024] + "..."
		}
		triggerSuccess = false
		log.Info("Failed to trigger savepoint", "jobID", jobID, "triggerID", triggerID, "error", err)
	} else {
		triggerSuccess = true
		triggerID = savepointTriggerID.RequestID
		log.Info("Successfully savepoint triggered", "jobID", jobID, "triggerID", triggerID)
	}
	newSavepointStatus := reconciler.getNewSavepointStatus(triggerID, triggerReason, message, triggerSuccess)

	return newSavepointStatus, err
}