func()

in runner/runners/invoke.go [74:423]


func (inv *Invoker) run(cmd *runner.Command, id runner.RunID, abortCh chan struct{}, memCh chan execer.ProcessStatus, updateCh chan runner.RunStatus) (r runner.RunStatus) {
	log.WithFields(
		log.Fields{
			"runID":  id,
			"tag":    cmd.Tag,
			"jobID":  cmd.JobID,
			"taskID": cmd.TaskID,
		}).Info("*Invoker.run()")
	inv.stat.Gauge(stats.WorkerRunningTask).Update(1)
	defer inv.stat.Gauge(stats.WorkerRunningTask).Update(0)

	taskTimer := inv.stat.Latency(stats.WorkerTaskLatency_ms).Time()

	defer func() {
		taskTimer.Stop()
		updateCh <- r
		close(updateCh)
	}()

	start := time.Now()

	// Records various stages of the run
	// TODO opporunity for consolidation with existing timers and metrics as part of larger refactor
	rts := &runTimes{}
	rts.invokeStart = stamp()

	var co snapshot.Checkout
	checkoutCh := make(chan error)

	// set up pre/postprocessors
	for _, pp := range inv.preprocessors {
		log.Info("running preprocessor")
		if err := pp(); err != nil {
			log.Errorf("Error running preprocessor %s", err)
		}
	}
	defer func() {
		for _, pp := range inv.postprocessors {
			log.Infof("running postprocessor")
			if err := pp(); err != nil {
				log.Errorf("Error running postprocessor %s", err)
			}
		}
	}()

	// Determine RunType from Command SnapshotID
	// This invoker supports RunTypeScoot
	var runType runner.RunType = runner.RunTypeScoot
	if _, ok := inv.filerMap[runType]; !ok {
		return runner.FailedStatus(id,
			errors.NewError(fmt.Errorf("Invoker does not have filer for command of RunType: %s", runType), errors.PreProcessingFailureExitCode),
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
	}

	rts.inputStart = stamp()

	// if we are checking out a snapshot, start the timer outside of go routine
	var downloadTimer stats.Latency
	if cmd.SnapshotID != "" {
		downloadTimer = inv.stat.Latency(stats.WorkerDownloadLatency_ms).Time()
		inv.stat.Counter(stats.WorkerDownloads).Inc(1)
	}

	// update local workspace with snapshot
	go func() {
		if cmd.SnapshotID == "" {
			// TODO: we don't want this logic to live here, these decisions should be made at a higher level.
			if len(cmd.Argv) > 0 && cmd.Argv[0] != execers.UseSimExecerArg {
				log.WithFields(
					log.Fields{
						"runID":  id,
						"tag":    cmd.Tag,
						"jobID":  cmd.JobID,
						"taskID": cmd.TaskID,
					}).Info("No snapshotID! Using a nop-checkout initialized with tmpDir")
			}
			if tmp, err := ioutil.TempDir("", "invoke_nop_checkout"); err != nil {
				checkoutCh <- err
			} else {
				co = snapshot.NewNopCheckout(string(id), tmp)
				checkoutCh <- nil
			}
		} else {
			log.WithFields(
				log.Fields{
					"runID":      id,
					"tag":        cmd.Tag,
					"jobID":      cmd.JobID,
					"taskID":     cmd.TaskID,
					"snapshotID": cmd.SnapshotID,
				}).Info("Checking out snapshotID")
			var err error
			co, err = inv.filerMap[runType].Filer.Checkout(cmd.SnapshotID)
			checkoutCh <- err
		}
	}()

	// wait for checkout to finish (or abort signal)
	select {
	case <-abortCh:
		if err := inv.filerMap[runType].Filer.CancelCheckout(); err != nil {
			log.Errorf("Error canceling checkout: %s", err)
		}
		if err := <-checkoutCh; err != nil {
			log.Errorf("Checkout errored: %s", err)
			// If there was an error there should be no lingering gitdb locks, so return
			// In addition, co should be nil, so failing to return and calling co.Release()
			// will result in a nil pointer dereference
		} else {
			// If there was no error then we need to release this checkout.
			co.Release()
		}
		return runner.AbortStatus(id,
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
	case err := <-checkoutCh:
		// stop the timer
		// note: aborted runs don't stop the timer - the reported download time should remain 0
		// successful and erroring downloads will report time values
		if cmd.SnapshotID != "" {
			downloadTimer.Stop()
		}
		if err != nil {
			var failedStatus runner.RunStatus
			codeErr, ok := err.(*errors.ExitCodeError)
			switch ok {
			case true:
				// err is of type github.com/twitter/scoot/common/errors.Error
				failedStatus = runner.FailedStatus(id, codeErr,
					tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
			default:
				// err is not of type github.com/twitter/scoot/common/errors.Error
				failedStatus = runner.FailedStatus(id, errors.NewError(err, errors.GenericCheckoutFailureExitCode),
					tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
			}

			return failedStatus
		}
		// Checkout is ok, continue with run and when finished release checkout.
		defer co.Release()
		rts.inputEnd = stamp()
	}
	log.WithFields(
		log.Fields{
			"runID":    id,
			"tag":      cmd.Tag,
			"jobID":    cmd.JobID,
			"taskID":   cmd.TaskID,
			"checkout": co.Path(),
		}).Info("Checkout done")

	// setup stdout,stderr output
	stdout, err := inv.output.Create(fmt.Sprintf("%s-stdout", id))
	if err != nil {
		msg := fmt.Sprintf("could not create stdout: %s", err)
		failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.LogRefCreationFailureExitCode),
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
		return failedStatus
	}
	defer stdout.Close()

	stderr, err := inv.output.Create(fmt.Sprintf("%s-stderr", id))
	if err != nil {
		msg := fmt.Sprintf("could not create stderr: %s", err)
		failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.LogRefCreationFailureExitCode),
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
		return failedStatus
	}
	defer stderr.Close()

	stdlog, err := inv.output.Create(fmt.Sprintf("%s-stdlog", id))
	if err != nil {
		msg := fmt.Sprintf("could not create combined stdout/stderr: %s", err)
		failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.LogRefCreationFailureExitCode),
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
		return failedStatus
	}
	defer stdlog.Close()

	marker := "###########################################\n###########################################\n"
	format := "%s\n\nDate: %v\nOut: %s\tErr: %s\tOutErr: %s\tCmd:\n%v\n\n%s\n\n\nSCOOT_CMD_LOG\n"
	header := fmt.Sprintf(format, marker, time.Now(), stdout.URI(), stderr.URI(), stdlog.URI(), cmd, marker)
	// If we wanted to allow optionally, a switch for this would come either at the Worker level
	// (via Invoker -> QueueRunner construction), or the Command level (job requestor specifies in e.g. a PlatformProperty)

	// Processing/setup post checkout before execution
	switch runType {
	case runner.RunTypeScoot:
		stdout.Write([]byte(header))
		stderr.Write([]byte(header))
		stdlog.Write([]byte(header))
	}

	inv.dirMonitor.GetStartSizes() // start monitoring directory sizes

	// start running the command
	log.WithFields(
		log.Fields{
			"runID":  id,
			"tag":    cmd.Tag,
			"jobID":  cmd.JobID,
			"taskID": cmd.TaskID,
			"stdout": stdout.AsFile(),
			"stderr": stderr.AsFile(),
			"stdlog": stdlog.AsFile(),
		}).Debug("Stdout/Stderr output")
	rts.execStart = stamp() // candidate for availability via Execer
	p, err := inv.exec.Exec(execer.Command{
		Argv:    cmd.Argv,
		EnvVars: cmd.EnvVars,
		Dir:     co.Path(),
		Stdout:  io.MultiWriter(stdout, stdlog),
		Stderr:  io.MultiWriter(stderr, stdlog),
		MemCh:   memCh,
		LogTags: cmd.LogTags,
	})
	if err != nil {
		msg := fmt.Sprintf("could not exec: %s", err)
		failedStatus := runner.FailedStatus(id, errors.NewError(e.New(msg), errors.CouldNotExecExitCode),
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
		return failedStatus
	}

	var timeoutCh <-chan time.Time
	if cmd.Timeout > 0 { // Timeout if applicable
		elapsed := time.Now().Sub(start)
		timeout := time.NewTimer(cmd.Timeout - elapsed)
		timeoutCh = timeout.C
		defer timeout.Stop()
	}

	updateCh <- runner.RunningStatus(id, stdout.URI(), stderr.URI(),
		tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})

	processCh := make(chan execer.ProcessStatus, 1)
	go func() { processCh <- p.Wait() }()
	var runStatus runner.RunStatus

	// Wait for process to complete (or cancel if we're told to)
	select {
	case <-abortCh:
		stdout.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask aborted: %v", marker, cmd.String())))
		stderr.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask aborted: %v", marker, cmd.String())))
		stdlog.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask aborted: %v", marker, cmd.String())))
		p.Abort()
		return runner.AbortStatus(id,
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
	case <-timeoutCh:
		stdout.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask exceeded timeout %v: %v", marker, cmd.Timeout, cmd.String())))
		stderr.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask exceeded timeout %v: %v", marker, cmd.Timeout, cmd.String())))
		stdlog.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\nTask exceeded timeout %v: %v", marker, cmd.Timeout, cmd.String())))
		inv.stat.Counter(stats.WorkerTimeouts).Inc(1)
		p.Abort()
		log.WithFields(
			log.Fields{
				"cmd":    cmd.String(),
				"tag":    cmd.Tag,
				"jobID":  cmd.JobID,
				"taskID": cmd.TaskID,
			}).Error("Run timedout")
		runStatus = runner.TimeoutStatus(id,
			tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
	case st := <-memCh:
		stdout.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\n%v", marker, st.Error)))
		stderr.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\n%v", marker, st.Error)))
		stdlog.Write([]byte(fmt.Sprintf("\n\n%s\n\nFAILED\n\n%v", marker, st.Error)))
		log.WithFields(
			log.Fields{
				"runID":    id,
				"cmd":      cmd.String(),
				"tag":      cmd.Tag,
				"jobID":    cmd.JobID,
				"taskID":   cmd.TaskID,
				"status":   st,
				"checkout": co.Path(),
			}).Errorf(st.Error)
		inv.stat.Counter(stats.WorkerMemoryCapExceeded).Inc(1)
		runStatus = getPostExecRunStatus(st, id, cmd)
		runStatus.Error = st.Error
	case st := <-processCh:
		// Process has completed
		log.WithFields(
			log.Fields{
				"runID":    id,
				"cmd":      cmd.String(),
				"tag":      cmd.Tag,
				"jobID":    cmd.JobID,
				"taskID":   cmd.TaskID,
				"status":   st,
				"checkout": co.Path(),
			}).Info("Run done")
		runStatus = getPostExecRunStatus(st, id, cmd)
		if runStatus.State == runner.FAILED {
			return runStatus
		}
	}

	// record command's disk usage for the monitored directories
	inv.dirMonitor.GetEndSizes()
	inv.dirMonitor.RecordSizeStats(inv.stat)

	// the command is no longer running, post process the results
	rts.execEnd = stamp()
	rts.outputStart = stamp()
	var stderrUrl, stdoutUrl string
	// only upload logs to a permanent location if a log uploader is initialized
	if inv.uploader != nil {
		defer inv.stat.Latency(stats.WorkerUploadLatency_ms).Time().Stop()

		// generate a unique id that's appended to the job id to create a unique identifier for logs
		logUid, _ := uuid.NewV4()
		stdlogName := "stdlog"
		stderrName := "stderr"
		stdoutName := "stdout"

		// upload stdlog
		logId := fmt.Sprintf("%s_%s/%s", cmd.JobID, logUid, stdlogName)
		_, isAborted := inv.uploadLog(logId, stdlog.AsFile(), abortCh)
		if isAborted {
			return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})

		}

		// upload stderr
		// TODO: remove when we transition to using only stdlog in run status
		logId = fmt.Sprintf("%s_%s/%s", cmd.JobID, logUid, stderrName)
		stderrUrl, isAborted = inv.uploadLog(logId, stderr.AsFile(), abortCh)
		if isAborted {
			return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
		}

		// upload stdout
		// TODO: remove when we transition to using only stdlog in run status
		logId = fmt.Sprintf("%s_%s/%s", cmd.JobID, logUid, stdoutName)
		stdoutUrl, isAborted = inv.uploadLog(logId, stdout.AsFile(), abortCh)
		if isAborted {
			return runner.AbortStatus(id, tags.LogTags{JobID: cmd.JobID, TaskID: cmd.TaskID, Tag: cmd.Tag})
		}
		// Note: stdout/stderr refs are only modified when logs are successfully uploaded to storage
		runStatus.StderrRef = stderrUrl
		runStatus.StdoutRef = stdoutUrl
	} else {
		log.Infof("log uploader not initialized, skipping logs upload to storage")
		// Return local std stream refs if log uploader is not initialized
		runStatus.StderrRef = stderr.URI()
		runStatus.StdoutRef = stdout.URI()
	}
	rts.outputEnd = stamp()
	rts.invokeEnd = stamp()
	return runStatus
}