worker/client/client.go (153 lines of code) (raw):
// The client package provides an interface and implementation for the
// Scoot Worker API, as well as a CLI client that wraps it.
package client
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"github.com/twitter/scoot/common/dialer"
"github.com/twitter/scoot/runner"
"github.com/twitter/scoot/runner/runners"
"github.com/twitter/scoot/worker/domain"
"github.com/twitter/scoot/worker/domain/gen-go/worker"
)
// Parameters for configuring connections to remote workers.
type WorkersClientJSONConfig struct {
Type string // transport type: rpc
PollingPeriod string // polling period default to 250ms
}
type Client interface {
// Connection funtions
Dial() error
Close() error
// Worker API Interactions
QueryWorker() (domain.WorkerStatus, error)
runner.Controller
runner.StatusQueryNower
runner.LegacyStatusReader
}
type simpleClient struct {
addr string
dialer dialer.Dialer
workerClient *worker.WorkerClient
}
// Create basic implementation of Client interface for interaction with Scoot worker API
func NewSimpleClient(di dialer.Dialer, addr string) (Client, error) {
cl := &simpleClient{}
cl.dialer = di
cl.addr = addr
return cl, nil
}
func (c *simpleClient) Dial() error {
_, err := c.dial()
return err
}
func (c *simpleClient) dial() (*worker.WorkerClient, error) {
if c.workerClient == nil {
if c.addr == "" {
c.addr = domain.DefaultWorker_Thrift
}
transport, protocolFactory, err := c.dialer.Dial(c.addr)
if err != nil {
return nil, fmt.Errorf("Error dialing to set up client connection: %v", err)
}
c.workerClient = worker.NewWorkerClientFactory(transport, protocolFactory)
}
return c.workerClient, nil
}
func (c *simpleClient) Close() error {
if c.workerClient != nil {
return c.workerClient.Transport.Close()
}
return nil
}
// Implements Scoot Worker API
func (c *simpleClient) Run(cmd *runner.Command) (runner.RunStatus, error) {
workerClient, err := c.dial()
if err != nil {
return runner.RunStatus{}, err
}
status, err := workerClient.Run(domain.DomainRunCommandToThrift(cmd))
if err != nil {
return runner.RunStatus{}, err
}
return domain.ThriftRunStatusToDomain(status), nil
}
// Implements Scoot Worker API
func (c *simpleClient) Abort(runID runner.RunID) (runner.RunStatus, error) {
workerClient, err := c.dial()
if err != nil {
return runner.RunStatus{}, err
}
status, err := workerClient.Abort(string(runID))
if err != nil {
return runner.RunStatus{}, err
}
return domain.ThriftRunStatusToDomain(status), nil
}
// Release local resources.
func (c *simpleClient) Release() {
c.Close()
}
// Implements Scoot Worker API
func (c *simpleClient) QueryWorker() (domain.WorkerStatus, error) {
workerClient, err := c.dial()
if err != nil {
return domain.WorkerStatus{}, err
}
status, err := workerClient.QueryWorker()
if err != nil {
return domain.WorkerStatus{}, err
}
return domain.ThriftWorkerStatusToDomain(status), nil
}
// Implements Scoot Worker API
func (c *simpleClient) Status(id runner.RunID) (runner.RunStatus, runner.ServiceStatus, error) {
ws, err := c.QueryWorker()
if err != nil {
return runner.RunStatus{}, runner.ServiceStatus{}, err
}
var svcErr error
if ws.Error != "" {
svcErr = errors.New(ws.Error)
}
svc := runner.ServiceStatus{Initialized: ws.Initialized, IsHealthy: ws.IsHealthy, Error: svcErr}
for _, p := range ws.Runs {
if p.RunID == id {
return p, svc, nil
}
}
return runner.RunStatus{}, svc, fmt.Errorf("no such process %v", id)
}
// Implements Scoot Worker API
func (c *simpleClient) StatusAll() ([]runner.RunStatus, runner.ServiceStatus, error) {
ws, err := c.QueryWorker()
if err != nil {
return nil, runner.ServiceStatus{}, err
}
var svcErr error
if ws.Error != "" {
svcErr = errors.New(ws.Error)
}
return ws.Runs, runner.ServiceStatus{Initialized: ws.Initialized, IsHealthy: ws.IsHealthy, Error: svcErr}, nil
}
func (c *simpleClient) QueryNow(q runner.Query) ([]runner.RunStatus, runner.ServiceStatus, error) {
st, svc, err := c.StatusAll()
if err != nil {
return nil, runner.ServiceStatus{}, err
}
st, err = runners.StatusesRO(st).QueryNow(q)
return st, svc, err
}
// Create a Bundlestore URI from an addr
func APIAddrToBundlestoreURI(addr string) string {
return "http://" + addr + "/bundle/"
}
// Get the path of the file containing the address for scootapi to use
func GetScootapiAddrPath() string {
optionalId := os.Getenv("SCOOT_ID") // Used to connect to a different set of scoot processes.
return path.Join(os.Getenv("HOME"), ".cloudscootaddr"+optionalId)
}
// Get the scootapi address (as host:port)
func GetScootapiAddr() (sched string, api string, err error) {
data, err := ioutil.ReadFile(GetScootapiAddrPath())
if err != nil {
if os.IsNotExist(err) {
return "", "", nil
}
return "", "", err
}
addrs := strings.Split(string(data), "\n")
if len(addrs) != 2 {
return "", "", errors.New("Expected both sched and api addrs, got: " + string(data))
}
return string(addrs[0]), string(addrs[1]), nil
}