worker/starter/server.go (170 lines of code) (raw):

// Package server provides the implementation of the Scoot Worker // Server, which implements the Worker API and starts the actual worker. package starter import ( "reflect" "sync" "time" "github.com/apache/thrift/lib/go/thrift" log "github.com/sirupsen/logrus" "github.com/twitter/scoot/common/log/helpers" "github.com/twitter/scoot/common/stats" "github.com/twitter/scoot/runner" "github.com/twitter/scoot/runner/runners" "github.com/twitter/scoot/worker/domain" "github.com/twitter/scoot/worker/domain/gen-go/worker" ) // Creates a Worker Server func MakeServer( handler worker.Worker, transport thrift.TServerTransport, transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory) thrift.TServer { return thrift.NewTSimpleServer4( worker.NewWorkerProcessor(handler), transport, transportFactory, protocolFactory) } // defining StatsCollectInterval type so it can be injected via ICE // (avoid conflict with other injected integers) type StatsCollectInterval time.Duration type handler struct { stat stats.StatsReceiver run runner.Service timeLastRpc time.Time mu sync.RWMutex currentCmd *runner.Command currentRunID runner.RunID } // Creates a new Handler which combines a runner.Service to do work and a StatsReceiver func NewHandler(stat stats.StatsReceiver, run runner.Service) worker.Worker { scopedStat := stat.Scope("handler") h := &handler{stat: scopedStat, run: run, timeLastRpc: time.Now()} stats.ReportServerRestart(scopedStat, stats.WorkerServerStartedGauge, stats.DefaultStartupGaugeSpikeLen) go h.stats() return h } // Periodically output stats //TODO: runner should eventually be extended to support stats, multiple runs, etc. (replacing loop here). func (h *handler) stats() { var startTime time.Time = time.Now() var initTime time.Duration nilTime := time.Time{} initDoneTime := nilTime ticker := time.NewTicker(time.Duration(stats.StatReportIntvl)) for { select { case <-ticker.C: h.mu.Lock() processes, svcStatus, err := h.run.StatusAll() if err != nil { continue } if svcStatus.Initialized { if initDoneTime == nilTime { initDoneTime = time.Now() initTime = initDoneTime.Sub(startTime) } var numFailed int64 var numActive int64 for _, process := range processes { if process.State == runner.FAILED { numFailed++ } if !process.State.IsDone() { numActive++ } } // if its done initializing, record the final initLatency time // compute the uptime as time since init finished uptime := time.Since(initDoneTime) timeSincelastContact_ms := int64(time.Now().Sub(h.timeLastRpc) / time.Millisecond) h.stat.Gauge(stats.WorkerFinalInitLatency_ms).Update(int64(initTime / time.Millisecond)) h.stat.Gauge(stats.WorkerActiveInitLatency_ms).Update(0) h.stat.Gauge(stats.WorkerActiveRunsGauge).Update(numActive) h.stat.Gauge(stats.WorkerFailedCachedRunsGauge).Update(numFailed) h.stat.Gauge(stats.WorkerEndedCachedRunsGauge).Update(int64(len(processes)) - numActive) // TODO errata metric - remove if unused h.stat.Gauge(stats.WorkerTimeSinceLastContactGauge_ms).Update(timeSincelastContact_ms) // TODO errata metric - remove if unused uptimeMs := int64(uptime / time.Millisecond) h.stat.Gauge(stats.WorkerUptimeGauge_ms).Update(uptimeMs) } else { initTime := time.Now().Sub(startTime) h.stat.Gauge(stats.WorkerActiveInitLatency_ms).Update(int64(initTime / time.Millisecond)) } h.mu.Unlock() } } } // Convenience func (h *handler) updateTimeLastRpc() { h.mu.Lock() h.timeLastRpc = time.Now() h.mu.Unlock() } // Implements worker.thrift Worker.QueryWorker interface func (h *handler) QueryWorker() (*worker.WorkerStatus, error) { h.stat.Counter(stats.WorkerServerQueries).Inc(1) h.updateTimeLastRpc() ws := worker.NewWorkerStatus() st, svc, err := h.run.StatusAll() if err != nil { ws.Error = err.Error() } else if svc.Error != nil { ws.Error = svc.Error.Error() } ws.Initialized = svc.Initialized ws.IsHealthy = svc.IsHealthy for _, status := range st { if status.State.IsDone() { // Note: TravisCI fails when output is too long so we set full status to Debug and disable it when running in that env. if log.GetLevel() == log.DebugLevel { log.Debugf("Worker returning finished run: %v", status) } else { log.Infof("Worker returning finished run: %v", status.RunID) } } ws.Runs = append(ws.Runs, domain.DomainRunStatusToThrift(status)) } return ws, nil } // Implements worker.thrift Worker.Run interface func (h *handler) Run(cmd *worker.RunCommand) (*worker.RunStatus, error) { defer h.stat.Latency(stats.WorkerServerStartRunLatency_ms).Time().Stop() h.stat.Counter(stats.WorkerServerRuns).Inc(1) log.WithFields( log.Fields{ "argv": cmd.Argv, "env": cmd.Env, "snapshotId": helpers.CopyPointerToString(cmd.SnapshotId), "timeoutMs": helpers.CopyPointerToInt32(cmd.TimeoutMs), "jobID": helpers.CopyPointerToString(cmd.JobId), "taskID": helpers.CopyPointerToString(cmd.TaskId), "tag": helpers.CopyPointerToString(cmd.Tag), }).Info("Worker trying to run cmd") h.updateTimeLastRpc() c := domain.ThriftRunCommandToDomain(cmd) status, err := h.run.Run(c) //Check if this is a dup retry for an already running command and if so get its status. //TODO(jschiller): accept a cmd.Nonce field so we can be precise about hiccups with dup cmd resends? if err != nil && err.Error() == runners.QueueFullMsg && reflect.DeepEqual(c, h.currentCmd) { log.Infof("Worker received dup request, recovering runID: %v", h.currentRunID) status, _, err = h.run.Status(h.currentRunID) } if err != nil { // Set invalid status and nil err to indicate handleable internal err. status.Error = err.Error() status.State = runner.FAILED } else { h.currentCmd = c h.currentRunID = status.RunID } // status's stdout, stderr, taskID, jobID, and tag might not be populated yet. // h.run.Run(c) calls *runner.Invoker#run in a goroutine, and these fields are set on the fly log.WithFields( log.Fields{ "runID": status.RunID, "snapshotID": status.SnapshotID, "state": status.State, "jobID": status.JobID, "taskID": status.TaskID, "tag": status.Tag, "stdout": status.StdoutRef, "stderr": status.StderrRef, "error": status.Error, }).Info("Worker returning run status") return domain.DomainRunStatusToThrift(status), nil } // Implements worker.thrift Worker.Abort interface func (h *handler) Abort(runId string) (*worker.RunStatus, error) { h.stat.Counter(stats.WorkerServerAborts).Inc(1) h.updateTimeLastRpc() log.Infof("Worker aborting runID: %s", runId) status, err := h.run.Abort(runner.RunID(runId)) if err != nil { // Set invalid status and nil err to indicate handleable domain err. status.Error = err.Error() status.State = runner.UNKNOWN status.RunID = runner.RunID(runId) } return domain.DomainRunStatusToThrift(status), nil }