func flinkClusterTaskInfo()

in pkg/flink/handler.go [193:229]


func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) (*pluginsCore.TaskInfo, error) {
	var taskLogs []*core.TaskLog

	tl, err := FlinkClusterTaskLogs(ctx, GetFlinkConfig(), FlinkTaskLogsInput{
		ClusterName: flinkCluster.Name,
		Namespace:   flinkCluster.Namespace,
	})
	if err != nil {
		return nil, err
	}

	taskLogs = append(taskLogs, tl...)

	info := flinkIdl.FlinkExecutionInfo{}
	components := flinkCluster.Status.Components

	if jmi := components.JobManagerIngress; jmi != nil {
		info.JobManager = &flinkIdl.JobManagerExecutionInfo{
			IngressURLs: jmi.URLs,
		}
	}

	if job := components.Job; job != nil {
		info.Job = &flinkIdl.JobExecutionInfo{Id: job.ID}
	}

	customInfo := &structpb.Struct{}
	err = utils.MarshalStruct(&info, customInfo)
	if err != nil {
		return nil, err
	}

	return &pluginsCore.TaskInfo{
		Logs:       taskLogs,
		CustomInfo: customInfo,
	}, nil
}