scheduler/setup/cmds.go (129 lines of code) (raw):
package setup
import (
"fmt"
"os"
"os/exec"
"os/signal"
"sync"
"syscall"
"time"
log "github.com/sirupsen/logrus"
)
// Cmds runs commands for Cloud Scoot setup.
// Over os/exec, it offers:
// *) (best-effort) clean-up (via Kill())
// *) logging when a process is started
// *) output redirection (TODO)
// *) Utility functions for Fatal logging that also Kill
// *) Kills all commands when any long-lived command finishes (to allow prompt debugging)
// Setup code that might start long-running commands should use
// Command instead of exec.Command, and StartCmd/StartRun instead of cmd.Start/cmd.Run
// Run and Start are convenience methods to make it easier to run stuff
// The main entry points Run() or Start() are convenience
type Cmds struct {
// commands we are watching (may be unstarted or finished)
watching []*exec.Cmd
mu sync.Mutex
wg sync.WaitGroup
killed bool
}
// Create a new Cmds
func NewCmds() *Cmds {
return &Cmds{}
}
// Create a new Cmds that has a signal handler installed
func NewSignalHandlingCmds() *Cmds {
r := NewCmds()
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
go func() {
var sig os.Signal
sig = <-sigchan
log.Infof("signal %s received; shutting down", sig)
r.Kill()
os.Exit(1)
}()
return r
}
// Kill kills all running commands
// NB: we can't guarantee this is called before exiting.
// If we really want to be correct, we have to start another process that will do
// the babysitting.
func (c *Cmds) Kill() {
c.mu.Lock()
defer c.mu.Unlock()
// If we already tried to kill, then don't try to kill again. And don't try to kill
// in the future.
if c.killed {
return
}
c.killed = true
log.Infof("Killing %d cmds", len(c.watching))
// Wait for all to be done.
allDoneCh := make(chan struct{})
go func() {
c.wg.Wait()
close(allDoneCh)
}()
if len(c.watching) == 0 {
return
}
// First, send SIGINT to each
for _, c := range c.watching {
if p := c.Process; p != nil {
log.Infof("SIGINT: %d %v", p.Pid, c.Path)
syscall.Kill(-1*p.Pid, syscall.SIGINT)
}
}
// Unlock so that remove can do its job (and signal alldoneCh)
c.mu.Unlock()
select {
case <-allDoneCh:
log.Infof("All completed")
case <-time.After(5 * time.Second):
log.Infof("Still waiting; killing all")
}
c.mu.Lock()
if len(c.watching) == 0 {
return
}
// They've been warned; now send SIGKILL
for _, c := range c.watching {
if p := c.Process; p != nil {
log.Infof("SIGKILL: %d %v", p.Pid, c.Path)
syscall.Kill(-1*p.Pid, syscall.SIGKILL)
}
}
}
// Commands creates a Command that is watched (and with appropriate redirection)
func (c *Cmds) Command(path string, arg ...string) *exec.Cmd {
c.mu.Lock()
log.Infof("RunCmd: %s %s\n", path, arg)
defer c.mu.Unlock()
// TODO(dbentley): consider migrating to use Execer and OSExecer
cmd := exec.Command(path, arg...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Stdout, cmd.Stderr = os.Stdout, os.Stderr
c.wg.Add(1)
c.watching = append(c.watching, cmd)
return cmd
}
// StartCmd starts a Cmd that was created by Command and expects it to run forever
// If cmd stops, c will call c.Kill() (to allow prompt debugging)
func (c *Cmds) StartCmd(cmd *exec.Cmd) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.killed {
return fmt.Errorf("killed; cannot start new cmds")
}
log.Info("Starting", cmd.Args)
err := cmd.Start()
if err == nil {
go func() {
pid := cmd.Process.Pid
log.Infof("Cmd %v started as %v", cmd.Args, pid)
cmd.Wait()
log.Infof("Cmd %v (%v) finished", pid, cmd.Path)
c.remove(cmd)
c.Kill()
}()
}
return err
}
// RunCmd runs a Cmd that was created by Command
func (c *Cmds) RunCmd(cmd *exec.Cmd) error {
log.Info("Running", cmd.Args)
// remove cmd once it's done
defer c.remove(cmd)
err := cmd.Run()
log.Info("Run Done: ", err)
return err
}
// Start is a convenience method that calls Command and then StartCmd
func (c *Cmds) Start(path string, arg ...string) error {
cmd := c.Command(path, arg...)
return c.StartCmd(cmd)
}
// Run is a convenience method that calls Command and then RunCmd
func (c *Cmds) Run(path string, arg ...string) error {
cmd := c.Command(path, arg...)
return c.RunCmd(cmd)
}
// stop watching a cmd (because it's done)
func (c *Cmds) remove(cmd *exec.Cmd) {
c.mu.Lock()
defer c.mu.Unlock()
for i, other := range c.watching {
if other == cmd {
c.watching = append(c.watching[0:i], c.watching[i+1:]...)
}
}
c.wg.Done()
}