scheduler/client/cli/run_job.go (121 lines of code) (raw):
package cli
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/twitter/scoot/common/client"
"github.com/twitter/scoot/scheduler/api/thrift/gen-go/scoot"
)
type runJobCmd struct {
streamName string
snapshotId string
jobFilePath string
tag string
}
func (c *runJobCmd) RegisterFlags() *cobra.Command {
r := &cobra.Command{
Use: "run_job",
Short: "RunJob",
}
r.Flags().StringVar(&c.streamName, "stream_name", "sm", "If passing snapshot_id=SHA, this is the global UID for the associated repo.")
r.Flags().StringVar(&c.snapshotId, "snapshot_id", "", "Repo checkout id: <master-sha> OR <backend>-<kind>(-<additional information>)+")
r.Flags().StringVar(&c.jobFilePath, "job_def", "", "JSON file to read jobs from. Error if snapshot_id flag is also provided.")
r.Flags().StringVar(&c.tag, "tag", "", "Tag can be specified by requestor in order to more easily trace a job through logs")
return r
}
// Scoot JobDefinitions specified in job_def JSON files should be able to
// satisfy these types as populated via https://golang.org/pkg/encoding/json/#Unmarshal
type CLIJobDef struct {
Tasks []TaskDef
DefaultTaskTimeoutMs int32
Priority int32
Tag string
Basis string
JobType string
Requestor string
}
type TaskDef struct {
Args []string
EnvVars map[string]string
SnapshotID string
TimeoutMs int32
TaskID string
}
func (c *runJobCmd) Run(cl *client.SimpleClient, cmd *cobra.Command, args []string) error {
log.Info("Running on scoot, args:", args)
jobDef := scoot.NewJobDefinition()
jobDef.Tag = &c.tag
switch {
case len(args) > 0 && c.jobFilePath != "":
return errors.New("You must provide either args or a job definition")
case len(args) > 0:
if c.snapshotId == "" {
log.Info("No snapshotID provided - cmd will be run in an empty tmpdir.")
} else if !strings.Contains(c.snapshotId, "-") {
//this is not a bundleID, assume it's a sha that's available upstream. Cf. snapshot/git/gitdb/README.md
streamId := fmt.Sprintf("stream-gc-%s-%s", c.streamName, c.snapshotId)
log.Infof("Converting sha to a stream-based snapshot_id: %s -> %s", c.snapshotId, streamId)
c.snapshotId = streamId
}
task := scoot.NewTaskDefinition()
taskId := "task1"
task.Command = scoot.NewCommand()
task.Command.Argv = args
task.SnapshotId = &c.snapshotId
task.TaskId = &taskId
jobDef.Tasks = []*scoot.TaskDefinition{task}
case c.jobFilePath != "":
f, err := os.Open(c.jobFilePath)
if err != nil {
return err
}
asBytes, err := ioutil.ReadAll(f)
if err != nil {
return err
}
var jsonJob CLIJobDef
err = json.Unmarshal(asBytes, &jsonJob)
if err != nil {
return err
}
if jsonJob.DefaultTaskTimeoutMs > 0 {
jobDef.DefaultTaskTimeoutMs = &jsonJob.DefaultTaskTimeoutMs
}
jobDef.Tag = &jsonJob.Tag
jobDef.Basis = &jsonJob.Basis
jobDef.JobType = &jsonJob.JobType
jobDef.Requestor = &jsonJob.Requestor
jobDef.Priority = &jsonJob.Priority
jobDef.Tasks = []*scoot.TaskDefinition{}
for _, jsonTask := range jsonJob.Tasks {
jt := jsonTask
taskDef := scoot.NewTaskDefinition()
taskDef.Command = scoot.NewCommand()
taskDef.Command.Argv = jt.Args
taskDef.Command.EnvVars = make(map[string]string)
for k, v := range jt.EnvVars {
taskDef.Command.EnvVars[k] = v
}
taskDef.SnapshotId = &jt.SnapshotID
taskDef.TaskId = &jt.TaskID
jobDef.Tasks = append(jobDef.Tasks, taskDef)
if jt.TimeoutMs > 0 {
taskDef.TimeoutMs = &jt.TimeoutMs
}
}
}
jobId, err := cl.ScootClient.RunJob(jobDef)
if err != nil {
switch err := err.(type) {
case *scoot.InvalidRequest:
return fmt.Errorf("Invalid Request: %v", err.GetMessage())
default:
return fmt.Errorf("Error running job: %v %T", err, err)
}
}
fmt.Println(jobId.ID) // must go to std out in case caller looking in stdout for the results
log.Infof("JobID: %s", jobId.ID)
return nil
}