in pkg/flink/handler.go [193:229]
func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) (*pluginsCore.TaskInfo, error) {
var taskLogs []*core.TaskLog
tl, err := FlinkClusterTaskLogs(ctx, GetFlinkConfig(), FlinkTaskLogsInput{
ClusterName: flinkCluster.Name,
Namespace: flinkCluster.Namespace,
})
if err != nil {
return nil, err
}
taskLogs = append(taskLogs, tl...)
info := flinkIdl.FlinkExecutionInfo{}
components := flinkCluster.Status.Components
if jmi := components.JobManagerIngress; jmi != nil {
info.JobManager = &flinkIdl.JobManagerExecutionInfo{
IngressURLs: jmi.URLs,
}
}
if job := components.Job; job != nil {
info.Job = &flinkIdl.JobExecutionInfo{Id: job.ID}
}
customInfo := &structpb.Struct{}
err = utils.MarshalStruct(&info, customInfo)
if err != nil {
return nil, err
}
return &pluginsCore.TaskInfo{
Logs: taskLogs,
CustomInfo: customInfo,
}, nil
}