scheduler/client/client.go (205 lines of code) (raw):

package client // client provides client side access to the scoot services. It is used by the // command line binaries to submit (thrift) requests to the scoot services. import ( "fmt" "github.com/twitter/scoot/common/dialer" "github.com/twitter/scoot/scheduler/api/thrift/gen-go/scoot" ) // A struct that supports establishing and maintaining a // connection for making requests to a CloudScootClient. // This client can only serve one request at a time. type CloudScootClient struct { addr string dialer dialer.Dialer client *scoot.CloudScootClient } // Parameters to configure a CloudScootClient connection type CloudScootClientConfig struct { Addr string //Address to connect to Dialer dialer.Dialer // dialer to use to connect to address } // Creates a CloudScootClient. Returns a client object which can // be used to execute calls to Scoot Cloud Exec. func NewCloudScootClient(config CloudScootClientConfig) *CloudScootClient { return &CloudScootClient{ addr: config.Addr, dialer: config.Dialer, client: nil, } } // RunJob API. Schedules a Job to run asynchronously via CloudExecScoot based on //the specified job. If successful the jobId is returned if not an error. func (c *CloudScootClient) RunJob(jobDef *scoot.JobDefinition) (r *scoot.JobId, err error) { err = c.checkForClient() if err != nil { return nil, err } jobId, err := c.client.RunJob(jobDef) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return jobId, err } // GetStatus API. Returns the JobStatus of the specified jobId if successful, // otherwise an erorr. func (c *CloudScootClient) GetStatus(jobId string) (r *scoot.JobStatus, err error) { err = c.checkForClient() if err != nil { return nil, err } jobStatus, err := c.client.GetStatus(jobId) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return jobStatus, err } // Close any open Transport associated with this ScootClient func (c *CloudScootClient) Close() error { if c.client != nil { return c.closeConnection() } return nil } func (c *CloudScootClient) KillJob(jobId string) (r *scoot.JobStatus, err error) { err = c.checkForClient() if err != nil { return nil, err } jobStatus, err := c.client.KillJob(jobId) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return jobStatus, err } func (c *CloudScootClient) OfflineWorker(req *scoot.OfflineWorkerReq) error { err := c.checkForClient() if err != nil { return err } err = c.client.OfflineWorker(req) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return err } func (c *CloudScootClient) ReinstateWorker(req *scoot.ReinstateWorkerReq) error { err := c.checkForClient() if err != nil { return err } err = c.client.ReinstateWorker(req) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return err } func (c *CloudScootClient) SetSchedulerStatus(maxTasks int32) error { // validation is also implemented in sched/definitions.go. We cannot use it here because it // causes a circular dependency. The two implementations can be consolidated when the code // is restructured if maxTasks < -1 { return fmt.Errorf("invlid max tasks value:%d. Must be >= -1.", maxTasks) } err := c.checkForClient() if err != nil { return err } return c.client.SetSchedulerStatus(maxTasks) } func (c *CloudScootClient) GetSchedulerStatus() (*scoot.SchedulerStatus, error) { err := c.checkForClient() if err != nil { return nil, err } schedulerStatus, err := c.client.GetSchedulerStatus() // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return schedulerStatus, err } // GetClassLoadPercents get the target load pcts for the classes func (c *CloudScootClient) GetClassLoadPercents() (map[string]int32, error) { if err := c.checkForClient(); err != nil { return nil, err } classLoadPercents, err := c.client.GetClassLoadPercents() // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return classLoadPercents, err } // SetClassLoadPercents set the target worker load % for each job class func (c *CloudScootClient) SetClassLoadPercents(classLoads map[string]int32) error { if err := c.checkForClient(); err != nil { return err } err := c.client.SetClassLoadPercents(classLoads) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return err } // GetRequestorToClassMap get map of requestor (reg exp) to class load pct func (c *CloudScootClient) GetRequestorToClassMap() (map[string]string, error) { if err := c.checkForClient(); err != nil { return nil, err } requestorToClassMap, err := c.client.GetRequestorToClassMap() // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return requestorToClassMap, err } // SetRequestorToClassMap set the map of requestor (requestor value is reg exp) to class name func (c *CloudScootClient) SetRequestorToClassMap(requestorToClassMap map[string]string) error { if err := c.checkForClient(); err != nil { return err } err := c.client.SetRequestorToClassMap(requestorToClassMap) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return err } // GetRebalanceMinimumDuration get the minimum time the scheduler load % must we over the re-balance // threshold before re-balancing func (c *CloudScootClient) GetRebalanceMinimumDuration() (int32, error) { if err := c.checkForClient(); err != nil { return -1, err } m, err := c.client.GetRebalanceMinimumDuration() // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return m, err } // SetRebalanceMinimumDuration set the minimum time the scheduler load % must we over the re-balance // threshold before re-balancing func (c *CloudScootClient) SetRebalanceMinimumDuration(durationMin int32) error { if err := c.checkForClient(); err != nil { return err } err := c.client.SetRebalanceMinimumDuration(durationMin) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return err } // GetRebalanceThreshold get the minimum difference between under/over allocated %s that will trigger // re-balancing func (c *CloudScootClient) GetRebalanceThreshold() (int32, error) { if err := c.checkForClient(); err != nil { return -1, err } rt, err := c.client.GetRebalanceThreshold() // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return rt, err } // SetRebalanceThreshold set the minimum difference between under/over allocated %s that will trigger // re-balancing func (c *CloudScootClient) SetRebalanceThreshold(threshold int32) error { if err := c.checkForClient(); err != nil { return err } err := c.client.SetRebalanceThreshold(threshold) // if an error occurred reset the connection, could be a broken pipe or other // unrecoverable error. reset connection so a new clean one gets created // on the next request if err != nil { // this could cause an error when closing transport // but we don't care do our best effort and move on c.closeConnection() } return err } // helper method to check for a non-nil client / create one func (c *CloudScootClient) checkForClient() (err error) { if c.client == nil { c.client, err = createClient(c.addr, c.dialer) if err != nil { return err } } return nil } // helper method to create a scoot.CloudScootClient func createClient(addr string, dialer dialer.Dialer) (*scoot.CloudScootClient, error) { transport, protocolFactory, err := dialer.Dial(addr) if err != nil { return nil, fmt.Errorf("Error dialing to set up client connection: %v", err) } return scoot.NewCloudScootClientFactory(transport, protocolFactory), nil } // helper method to close the connection and reset the // struct field to nil so it will get recreated next func (c *CloudScootClient) closeConnection() error { err := c.client.Transport.Close() c.client = nil return err }