scheduler/starter/modules.go (148 lines of code) (raw):
package starter
import (
"fmt"
"net"
"os"
"time"
"github.com/apache/thrift/lib/go/thrift"
log "github.com/sirupsen/logrus"
"golang.org/x/net/proxy"
"github.com/twitter/scoot/cloud/cluster"
"github.com/twitter/scoot/common"
"github.com/twitter/scoot/common/dialer"
"github.com/twitter/scoot/runner"
"github.com/twitter/scoot/runner/runners"
"github.com/twitter/scoot/saga"
"github.com/twitter/scoot/saga/sagalogs"
"github.com/twitter/scoot/scheduler/scheduler/config"
"github.com/twitter/scoot/scheduler/server"
"github.com/twitter/scoot/worker/client"
)
const DEFAULT_SOCKS_PORT = "50001"
const DEFAULT_SOCKS_ADDR = "localhost:" + DEFAULT_SOCKS_PORT
func GetWorkerRunnerServiceFn(workers client.WorkersClientJSONConfig, thriftTransportFactory thrift.TTransportFactory, binaryProtocolFactory thrift.TProtocolFactory) (func(cluster.Node) runner.Service, error) {
var rf func(cluster.Node) runner.Service
var err error
if workers.Type == "socks" {
rf, err = GetSocksWorker()
} else {
rf, err = client.NewWorkerThriftClient(thriftTransportFactory, binaryProtocolFactory, common.DefaultClientTimeout, workers)
}
if err != nil {
return nil, fmt.Errorf("error creating worker thrift client (runner.Service). %s", err)
}
return rf, nil
}
func GetSocksWorker() (server.RunnerFactory, error) {
transportFactory, err := NewTTransportFactory()
if err != nil {
return nil, err
}
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
wf := func(node cluster.Node) runner.Service {
di := dialer.NewSimpleDialer(transportFactory, protocolFactory, common.DefaultClientTimeout)
cl, _ := client.NewSimpleClient(di, string(node.Id()))
return runners.NewPollingService(cl, cl, 250*time.Millisecond)
}
return wf, nil
}
func GetProxyAddr() string {
port := os.Getenv("SOCKS_PORT")
if port != "" {
return "localhost:" + port
}
return ""
}
func GetProxyDialer(proxyAddr string) (proxy.Dialer, error) {
return proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
}
func NewTTransportFactory() (thrift.TTransportFactory, error) {
addr := GetProxyAddr()
if addr == "" {
addr = DEFAULT_SOCKS_ADDR
}
dialer, err := GetProxyDialer(addr)
if err != nil {
return nil, err
}
return &socksTransportFactory{dialer}, nil
}
type socksTransportFactory struct {
dialer proxy.Dialer
}
func (f *socksTransportFactory) GetTransport(trans thrift.TTransport) thrift.TTransport {
// If this is a SOCKS-able connection, then return a TTransport that uses SOCKS.
// If it's not, return trans unmodified.
tsock, ok := trans.(*thrift.TSocket)
if !ok {
// Don't know how to wrap anything other than a TSocket
return trans
}
if tsock.IsOpen() {
// Transport Factory should be called before Open
return trans
}
addr, ok := tsock.Addr().(*net.TCPAddr)
if !ok {
return trans
}
ip := addr.IP.To4()
if ip == nil {
return trans
}
// An Address is Socks-able if it starts with 10, because Twitter's production network lives in
// 10/8 in IPv4 space.
if ip[0] != 10 {
return trans
}
if f.dialer == nil {
// We want to use SOCKS, but can't, because we have no dialer
log.Info("transport will not use SOCKS proxy because SOCKS_PORT is unset", addr.String())
return trans
}
return &socksSocket{
addr: addr.String(),
timeout: 0, // thrift.TSocket gives us no way to extract the timeout
dialer: f.dialer,
}
}
// socksSocket creates a socket via the proxy dialer.
// (Ideally, there would be a form of TSocket that took a Dialer instead of a net.Conn)
type socksSocket struct {
*thrift.TSocket
addr string
timeout time.Duration
dialer proxy.Dialer
}
func (s *socksSocket) Open() error {
if s.IsOpen() {
return s.Open()
}
conn, err := s.dialer.Dial("tcp", s.addr)
if err != nil {
log.Infof("Failed connecting to %s via proxy, trying directly\n", s.addr)
conn, err = net.DialTimeout("tcp", s.addr, time.Duration(10*time.Second))
if err != nil {
return err
}
}
s.TSocket = thrift.NewTSocketFromConnTimeout(conn, 0)
return nil
}
func StartCluster(clusterJSON config.ClusterJSONConfig) (chan []cluster.NodeUpdate, error) {
var uc chan []cluster.NodeUpdate
var err error
if clusterJSON.Type == "inMemory" {
cmc := &config.ClusterMemoryConfig{
Count: clusterJSON.Count,
}
uc, err = cmc.Create()
} else {
clc := &config.ClusterLocalConfig{}
uc, err = clc.Create()
}
if err != nil {
return nil, fmt.Errorf("error creating cluster. Scheduler not started. %s", err)
}
return uc, nil
}
// MakeSagaLog - TODO remove saga or refactor it so this function can be moved into saga or sagalog
// the current organization leads to cyclic dependency when moving to saga or sagalogs
func MakeSagaLog(config config.SagaLogJSONConfig) (saga.SagaLog, error) {
if config.Type == "memory" {
return sagalogs.MakeInMemorySagaLog(time.Duration(config.ExpirationSec)*time.Second, time.Duration(config.GCIntervalSec)*time.Second), nil
}
if config.Type == "file" {
return sagalogs.MakeFileSagaLog(config.Directory)
}
return nil, fmt.Errorf("unsupported sagalog type: %s. No sagalog created", config.Type)
}