scheduler/api/server.go (93 lines of code) (raw):

package api // these functions are the service side entry points for the thrift protocol // (they are call from cloudscoot.go) import ( "time" "github.com/apache/thrift/lib/go/thrift" log "github.com/sirupsen/logrus" "github.com/twitter/scoot/common/stats" "github.com/twitter/scoot/saga" schedthrift "github.com/twitter/scoot/scheduler/api/thrift" "github.com/twitter/scoot/scheduler/api/thrift/gen-go/scoot" "github.com/twitter/scoot/scheduler/server" ) // Creates and returns a new server Handler, which combines the scheduler, // saga coordinator and stats receivers. func NewHandler(scheduler server.Scheduler, sc saga.SagaCoordinator, stat stats.StatsReceiver) scoot.CloudScoot { handler := &Handler{scheduler: scheduler, sagaCoord: sc, stat: stat} go stats.StartUptimeReporting(stat, stats.SchedUptime_ms, stats.SchedServerStartedGauge, stats.DefaultStartupGaugeSpikeLen) return handler } // Creates a Thrift server given a Handler and Thrift connection information func MakeServer(handler scoot.CloudScoot, transport thrift.TServerTransport, transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory) thrift.TServer { return thrift.NewTSimpleServer4( scoot.NewCloudScootProcessor(handler), transport, transportFactory, protocolFactory) } // Wrapping type that combines a scheduler, saga coordinator and stat receiver into a server type Handler struct { scheduler server.Scheduler sagaCoord saga.SagaCoordinator stat stats.StatsReceiver } // Implements RunJob Cloud Scoot API func (h *Handler) RunJob(def *scoot.JobDefinition) (*scoot.JobId, error) { defer h.stat.Latency(stats.SchedServerRunJobLatency_ms).Time().Stop() // TODO errata metric - remove if unused h.stat.Counter(stats.SchedServerRunJobCounter).Inc(1) // TODO errata metric - remove if unused return schedthrift.RunJob(h.scheduler, def, h.stat) } // Implements GetStatus Cloud Scoot API func (h *Handler) GetStatus(jobId string) (*scoot.JobStatus, error) { defer h.stat.Latency(stats.SchedServerJobStatusLatency_ms).Time().Stop() h.stat.Counter(stats.SchedServerJobStatusCounter).Inc(1) return schedthrift.GetJobStatus(jobId, h.sagaCoord) } // Implements KillJob Cloud Scoot API func (h *Handler) KillJob(jobId string) (*scoot.JobStatus, error) { defer h.stat.Latency(stats.SchedServerJobKillLatency_ms).Time().Stop() h.stat.Counter(stats.SchedServerJobKillCounter).Inc(1) return schedthrift.KillJob(jobId, h.scheduler, h.sagaCoord) } // Implements OfflineWorker Cloud Scoot API func (h *Handler) OfflineWorker(req *scoot.OfflineWorkerReq) error { return schedthrift.OfflineWorker(req, h.scheduler) } // Implements ReinstateWorker Cloud Scoot API func (h *Handler) ReinstateWorker(req *scoot.ReinstateWorkerReq) error { return schedthrift.ReinstateWorker(req, h.scheduler) } // Implements GetSchedulerStatus Cloud Scoot API func (h *Handler) GetSchedulerStatus() (*scoot.SchedulerStatus, error) { return schedthrift.GetSchedulerStatus(h.scheduler) } // Implements SetSchedulerStatus Cloud Scoot API func (h *Handler) SetSchedulerStatus(maxNumTasks int32) error { return schedthrift.SetSchedulerStatus(h.scheduler, maxNumTasks) } // GetClassLoadPercents Implements GetClassLoadPercents Cloud Scoot API func (h *Handler) GetClassLoadPercents() (map[string]int32, error) { clp, err := schedthrift.GetClassLoadPercents(h.scheduler) log.Infof("GetClassLoadPercents returning: %v, err:%v", clp, err) return clp, err } // SetClassLoadPercents Implements SetClassLoadPercents Cloud Scoot API func (h *Handler) SetClassLoadPercents(classLoadPercents map[string]int32) error { log.Infof("SetClassLoadPercents to %v", classLoadPercents) return schedthrift.SetClassLoadPercents(h.scheduler, classLoadPercents) } // GetRequestorToClassMap Implements GetRequestorToClassMap Cloud Scoot API func (h *Handler) GetRequestorToClassMap() (map[string]string, error) { rm, err := schedthrift.GetRequestorToClassMap(h.scheduler) log.Infof("GetClassLoadPercents returning: %v, err:%v", rm, err) return rm, err } // SetRequestorToClassMap Implements SetRequestorToClassMap Cloud Scoot API func (h *Handler) SetRequestorToClassMap(requestToClassMap map[string]string) error { log.Infof("SetRequestorToClassMap to %v", requestToClassMap) return schedthrift.SetRequestorToClassMap(h.scheduler, requestToClassMap) } // GetRebalanceMinimumDuration get the duration(minutes) that the scheduler needs to be exceeding // the rebalance threshold before rebalancing. <= 0 implies no rebalancing func (h *Handler) GetRebalanceMinimumDuration() (int32, error) { d, err := schedthrift.GetRebalanceMinimumDuration(h.scheduler) log.Infof("GetRebalanceMinimumDuration returning: %d, err:%v", d, err) return int32(d.Minutes()), err } // SetRebalanceMinimumDuration set the duration(minutes) that the scheduler needs to be exceeding // the rebalance threshold before rebalancing. <= 0 implies no rebalancing func (h *Handler) SetRebalanceMinimumDuration(durationMinimum int32) error { log.Infof("SetRebalanceMinimumDuration to %d", durationMinimum) d := time.Duration(durationMinimum) * time.Minute return schedthrift.SetRebalanceMinimumDuration(h.scheduler, d) } // GetRebalanceThreshold the % spread threshold that must be exceeded to trigger rebalance // <= 0 implies no rebalancing func (h *Handler) GetRebalanceThreshold() (int32, error) { t, err := schedthrift.GetRebalanceThreshold(h.scheduler) log.Infof("GetRebalanceThreshold returning: %d, err:%v", t, err) return t, err } // SetRebalanceThreshold the % spread threshold that must be exceeded to trigger rebalance // <= 0 implies no rebalancing func (h *Handler) SetRebalanceThreshold(threshold int32) error { log.Infof("SetRebalanceThreshold to %d", threshold) return schedthrift.SetRebalanceThreshold(h.scheduler, threshold) }