scheduler/server/cluster_state.go (328 lines of code) (raw):
package server
import (
"fmt"
"time"
"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
cc "github.com/twitter/scoot/cloud/cluster"
"github.com/twitter/scoot/common"
"github.com/twitter/scoot/common/stats"
)
const noJob = ""
const noTask = ""
const defaultMaxLostDuration = time.Minute
const defaultMaxFlakyDuration = 15 * time.Minute
var nilTime = time.Time{}
// Cluster will use this function to determine if newly added nodes are ready to be used and also to
// determine if the node became unhealthy after running some task and is not ready to be used.
type ReadyFn func(cc.Node) (ready bool, backoffDuration time.Duration)
// clusterState maintains a cluster of nodes and information about what task is running on each node.
// nodeGroups is for node affinity where we want to remember which node last ran with what snapshot.
// NOTE: a node can be both running in scheduler and suspended here (distributed system eventual consistency...)
type clusterState struct {
nodesUpdatesCh chan []cc.NodeUpdate
nodes map[cc.NodeId]*nodeState // All healthy nodes.
suspendedNodes map[cc.NodeId]*nodeState // All new, lost, or flaky nodes, disjoint from 'nodes'.
offlinedNodes map[cc.NodeId]*nodeState // All User initiated offline nodes. Disjoint from 'nodes' & 'suspendedNodes'
nodeGroups map[string]*nodeGroup // key is a snapshotId.
maxLostDuration time.Duration // after which we remove a node from the cluster entirely
maxFlakyDuration time.Duration // after which we mark it not flaky and put it back in rotation.
readyFn ReadyFn // If provided, new or unhealthy nodes will be suspended until this returns true.
numRunning int // Number of running nodes. running + free + suspended ~= allNodes (may lag)
stats stats.StatsReceiver // for collecting stats about node availability
nopUpdateCnt int
}
func (c *clusterState) isOfflined(ns *nodeState) bool {
if _, ok := c.offlinedNodes[ns.node.Id()]; ok {
return true
}
return false
}
type nodeGroup struct {
idle map[cc.NodeId]*nodeState
busy map[cc.NodeId]*nodeState
}
func newNodeGroup() *nodeGroup {
return &nodeGroup{idle: map[cc.NodeId]*nodeState{}, busy: map[cc.NodeId]*nodeState{}}
}
// The State of A Node in the Cluster
type nodeState struct {
node cc.Node
runningJob string
runningTask string
snapshotId string
timeLost time.Time // Time when node was marked lost, if set (lost and flaky are mutually exclusive).
timeFlaky time.Time // Time when node was marked flaky, if set (lost and flaky are mutually exclusive).
readyCh chan interface{} // We create goroutines for each new node which will close this channel once the node is ready.
removedCh chan interface{} // We send nil when a node has been removed and we want the above goroutine to exit.
}
func (n *nodeState) String() string {
return fmt.Sprintf("{node:%s, jobId:%s, taskId:%s, snapshotId:%s, timeLost:%v, timeFlaky:%v, ready:%t}",
spew.Sdump(n.node), n.runningJob, n.runningTask, n.snapshotId, n.timeLost, n.timeFlaky, (n.readyCh == nil))
}
// This node was either reported lost by a NodeUpdate and we keep it around for a bit in case it revives,
// or it experienced connection related errors so we sideline it for a little while.
func (ns *nodeState) suspended() bool {
return ns.readyCh != nil || ns.timeLost != nilTime || ns.timeFlaky != nilTime
}
// This node is ready if the readyCh has been closed, either upon creation or in the startReadyLoop() goroutine.
// Note, nodes can be ready and healthy but still suspended due to update() lagging behind async startReadyLoop().
func (ns *nodeState) ready() bool {
ready := (ns.readyCh == nil)
if !ready {
select {
case <-ns.readyCh:
ready = true
default:
}
}
return ready
}
// Starts a goroutine loop checking node readiness, waiting 'backoff' time between checks, and exiting if node is fully removed.
func (ns *nodeState) startReadyLoop(rfn ReadyFn) {
ns.readyCh = make(chan interface{})
go func() {
done := false
for !done {
if ready, backoff := rfn(ns.node); ready {
close(ns.readyCh)
done = true
} else if backoff == 0 {
done = true
} else {
select {
case <-ns.removedCh:
done = true
case <-time.After(backoff):
break
}
}
}
}()
}
// Initializes a Node State for the specified Node
func newNodeState(node cc.Node) *nodeState {
return &nodeState{
node: node,
runningJob: noJob,
runningTask: noTask,
snapshotId: "",
timeLost: nilTime,
timeFlaky: nilTime,
readyCh: nil,
removedCh: make(chan interface{}),
}
}
// Create a ClusterState which gets node updates from the buffered nodes updates channel.
// We use a buffered nodes updates channel to prevent cluster state from blocking the object that
// is recognizing the node updates (typically cluster).
// New Nodes are considered suspended till the optional ReadyFn reports the node is ready.
// ReadyFn is also used to communicate if the node becomes unhealthy after a task completion
// A ClusterState is returned.
func newClusterState(nodesUpdatesCh chan []cc.NodeUpdate, rfn ReadyFn, stats stats.StatsReceiver) *clusterState {
cs := &clusterState{
nodesUpdatesCh: nodesUpdatesCh,
nodes: make(map[cc.NodeId]*nodeState),
suspendedNodes: map[cc.NodeId]*nodeState{},
offlinedNodes: make(map[cc.NodeId]*nodeState),
nodeGroups: map[string]*nodeGroup{"": newNodeGroup()},
maxLostDuration: defaultMaxLostDuration,
maxFlakyDuration: defaultMaxFlakyDuration,
readyFn: rfn,
stats: stats,
}
cs.updateCluster()
return cs
}
// return the number of free nodes that are not in a suspended state and not running tasks.
// Note: we assume numFree() is called from methods that have already created a (sync) lock
func (c *clusterState) numFree() int {
// This can go negative due to lost nodes, set lower bound at zero.
return max(0, len(c.nodes)-c.numRunning)
}
// update ClusterState to reflect that a task has been scheduled on a particular node
// SnapshotId should be the value from the task definition associated with the given taskId.
func (c *clusterState) taskScheduled(nodeId cc.NodeId, jobId, taskId, snapshotId string) {
ns := c.nodes[nodeId]
delete(c.nodeGroups[ns.snapshotId].idle, nodeId)
empty := len(c.nodeGroups[ns.snapshotId].idle) == 0 && len(c.nodeGroups[ns.snapshotId].busy) == 0
if ns.snapshotId != "" && empty {
delete(c.nodeGroups, ns.snapshotId)
}
if _, ok := c.nodeGroups[snapshotId]; !ok {
c.nodeGroups[snapshotId] = newNodeGroup()
}
c.nodeGroups[snapshotId].busy[nodeId] = ns
ns.runningJob = jobId
ns.runningTask = taskId
ns.snapshotId = snapshotId
c.numRunning++
}
// update ClusterState to reflect that a task has finished running on
// a particular node, whether successfully or unsuccessfully.
// If the node isn't found then the node was already suspended and deleted, just decrement numRunning.
func (c *clusterState) taskCompleted(nodeId cc.NodeId, flaky bool) {
var ns *nodeState
var ok bool
if ns, ok = c.nodes[nodeId]; !ok {
// This node was removed from the cluster already, check if it was moved to suspendedNodes.
ns, ok = c.suspendedNodes[nodeId]
}
if ok {
// check if node is ready after the last run, unhealthy nodes will not be ready
// mark them as lost because we do not yet have a way to recover unhealthy nodes
if c.readyFn != nil {
if ready, _ := c.readyFn(ns.node); !ready && !ns.suspended() {
delete(c.nodes, nodeId)
c.suspendedNodes[nodeId] = ns
ns.timeLost = time.Now()
}
}
if flaky && !ns.suspended() {
delete(c.nodes, nodeId)
c.suspendedNodes[nodeId] = ns
ns.timeFlaky = time.Now()
}
ns.runningJob = noJob
ns.runningTask = noTask
delete(c.nodeGroups[ns.snapshotId].busy, nodeId)
c.nodeGroups[ns.snapshotId].idle[nodeId] = ns
} else {
log.Infof("TaskCompleted specified an unknown node: %v (flaky=%t) (likely reaped already)", nodeId, flaky)
}
c.numRunning--
}
func (c *clusterState) getNodeState(nodeId cc.NodeId) (*nodeState, bool) {
ns, ok := c.nodes[nodeId]
return ns, ok
}
// update cluster state to reflect added and removed nodes
// it will process at most DefaultClusterSize sets of updates with each call
func (c *clusterState) updateCluster() {
defer c.stats.Latency(stats.SchedUpdateClusterLatency_ms).Time().Stop()
allUpdates := []cc.NodeUpdate{}
LOOP:
for i := 0; i < common.DefaultClusterChanSize; i++ {
select {
case updates := <-c.nodesUpdatesCh:
allUpdates = append(allUpdates, updates...)
default:
break LOOP
}
}
c.update(allUpdates)
}
// Processes nodes being added and removed from the cluster & updates the distributor state accordingly.
// Note, we don't expect there to be many updates after startup if the cluster is relatively stable.
// TODO(jschiller) this assumes that new nodes never have the same id as previous ones but we shouldn't rely on that.
func (c *clusterState) update(updates []cc.NodeUpdate) {
// Apply updates
adds := 0
removals := 0
for _, update := range updates {
var newNode *nodeState
switch update.UpdateType {
case cc.NodeAdded:
adds += 1
if update.UserInitiated {
log.Infof("NodeAdded: Reinstating offlined node %s", update.Id)
if ns, ok := c.offlinedNodes[update.Id]; ok {
c.nodes[update.Id] = ns
delete(c.offlinedNodes, update.Id)
} else {
log.Errorf("NodeAdded: Unable to reinstate node %s, not present in offlinedNodes", update.Id)
}
} else if ns, ok := c.suspendedNodes[update.Id]; ok {
if !ns.ready() {
// Adding a node that's already suspended as non-ready, leave it in that state until ready.
log.Infof("NodeAdded: Suspended node re-added but still awaiting readiness check %v (%s)", update.Id, ns)
} else if ns.timeLost != nilTime {
// This node was suspended as lost earlier, we can recover it now.
ns.timeLost = nilTime
c.nodes[update.Id] = ns
delete(c.suspendedNodes, update.Id)
log.Infof("NodeAdded: Recovered suspended node %v (%s), %s", update.Id, ns, c.status())
} else {
log.Infof("NodeAdded: Ignoring NodeAdded event for suspended flaky node. %v (%s)", update.Id, ns)
}
} else if ns, ok := c.nodes[update.Id]; !ok {
// This is a new unrecognized node, add it to the cluster, possibly in a suspended state.
if _, ok := c.offlinedNodes[update.Id]; ok {
// the nodes was manually offlined, leave it offlined
log.Infof("NodeAdded: Ignoring NodeAdded event for offlined node: %v (%#v), %s", update.Id, update.Node, c.status())
} else if c.readyFn == nil {
// We're not checking readiness, skip suspended state and add this as a healthy node.
newNode = newNodeState(update.Node)
c.nodes[update.Id] = newNode
newNode.readyCh = nil
log.Infof("NodeAdded: Added new node: %v (%#v), %s", update.Id, update.Node, c.status())
} else {
// Add this to the suspended nodes and start a goroutine to check for readiness.
newNode = newNodeState(update.Node)
c.suspendedNodes[update.Id] = newNode
log.Infof("NodeAdded: Added new suspended node: %v (%#v), %s", update.Id, update.Node, c.status())
newNode.startReadyLoop(c.readyFn)
}
c.nodeGroups[""].idle[update.Id] = newNode
} else {
// This node is already present, log this spurious add.
log.Infof("NodeAdded: Node already added!! %v (%s)", update.Id, ns)
}
case cc.NodeRemoved:
removals += 1
if update.UserInitiated {
log.Infof("NodeRemoved: Offlining node %s", update.Id)
if ns, ok := c.nodes[update.Id]; ok {
c.offlinedNodes[update.Id] = ns
delete(c.nodes, update.Id)
} else if ns, ok := c.suspendedNodes[update.Id]; ok {
c.offlinedNodes[update.Id] = ns
delete(c.suspendedNodes, update.Id)
} else {
log.Errorf("NodeRemoved: Unable to offline node %s, not present in nodes or suspendedNodes", update.Id)
}
} else if ns, ok := c.suspendedNodes[update.Id]; ok {
// Node already suspended, make sure it's now marked as lost and not flaky (keep readiness status intact).
log.Infof("NodeRemoved: Already suspended node marked as removed: %v (was %s)", update.Id, ns)
ns.timeLost = time.Now()
ns.timeFlaky = nilTime
} else if ns, ok := c.nodes[update.Id]; ok {
// This was a healthy node, mark it as lost now.
ns.timeLost = time.Now()
c.suspendedNodes[update.Id] = ns
delete(c.nodes, update.Id)
log.Infof("NodeRemoved: Removing node by marking as lost: %v (%s), %s", update.Id, ns, c.status())
} else if ns, ok := c.offlinedNodes[update.Id]; ok {
// the node was manually offlined, remove it from offlined list
delete(c.offlinedNodes, update.Id)
log.Infof("NodeRemoved: Removing offlined node: %v (%s), %s", update.Id, ns, c.status())
} else {
// We don't know about this node, log spurious remove.
log.Infof("NodeRemoved: Cannot remove unknown node: %v", update.Id)
}
}
}
// record when we see nodes being added removed
// also record how many times we've checked and didn't see any changes
if adds > 0 || removals > 0 {
log.Infof("Number of nodes added: %d\nNumber of nodes removed: %d, (%d cluster updates with no change)", adds, removals, c.nopUpdateCnt)
c.nopUpdateCnt = 0
} else {
c.nopUpdateCnt++
}
// Clean up lost nodes that haven't recovered in time, add flaky nodes back into rotation after some time,
// and check if newly added non-ready nodes are ready to be put into rotation.
now := time.Now()
for _, ns := range c.suspendedNodes {
if ns.ready() {
ns.readyCh = nil
}
if !ns.suspended() {
// This node is initialized, remove it from suspended nodes and add it to the healthy node pool.
// preserve its current snapshotId association
c.nodes[ns.node.Id()] = ns
delete(c.suspendedNodes, ns.node.Id())
log.Infof("SuspendedNode: Node now ready, adding to rotation: %v (%s), %s", ns.node.Id(), ns, c.status())
} else if ns.timeLost != nilTime && time.Since(ns.timeLost) > c.maxLostDuration {
// This node has been missing too long, delete all references to it.
delete(c.suspendedNodes, ns.node.Id())
delete(c.nodeGroups[ns.snapshotId].idle, ns.node.Id())
delete(c.nodeGroups[ns.snapshotId].busy, ns.node.Id())
log.Infof("SuspendedNode: Deleting lost node: %v (%s), %s", ns.node.Id(), ns, c.status())
// Try to notify this node's goroutine about removal so it can stop checking readiness if necessary.
select {
case ns.removedCh <- nil:
default:
}
} else if ns.timeFlaky != nilTime && now.Sub(ns.timeFlaky) > c.maxFlakyDuration {
// This flaky node has been suspended long enough, try adding it back to the healthy node pool.
// We process this using the startReadyLoop/readyFn if present to reapply any side-effects,
// but leave it with its current snapshotId association
//
// TimeFlaky should've been the only time* value set at this point, reset it.
// SnapshotId must be reset since it's used in taskScheduled() and may be gone from nodeGroups.
ns.timeFlaky = nilTime
ns.snapshotId = ""
if c.readyFn != nil {
log.Infof("SuspendedNode: Reinstating flaky node momentarily: %v (%s), %s", ns.node.Id(), ns, c.status())
ns.startReadyLoop(c.readyFn)
} else {
log.Infof("SuspendedNode: Reinstating flaky node now: %v (%s), %s", ns.node.Id(), ns, c.status())
delete(c.suspendedNodes, ns.node.Id())
c.nodes[ns.node.Id()] = ns
}
}
}
if adds > 0 || removals > 0 {
log.Infof("Number of nodes added: %d\nNumber of nodes removed: %d, num iterations without change: %d. %s", adds, removals, c.nopUpdateCnt, c.status())
c.nopUpdateCnt = 0
} else {
c.nopUpdateCnt++
}
c.stats.Gauge(stats.ClusterAvailableNodes).Update(int64(len(c.nodes)))
c.stats.Gauge(stats.ClusterFreeNodes).Update(int64(c.numFree()))
c.stats.Gauge(stats.ClusterRunningNodes).Update(int64(c.numRunning))
c.stats.Gauge(stats.ClusterLostNodes).Update(int64(len(c.suspendedNodes)))
}
func (c *clusterState) status() string {
return fmt.Sprintf("now have %d healthy (%d free, %d running), and %d suspended",
len(c.nodes), c.numFree(), c.numRunning, len(c.suspendedNodes))
}
// the following functions implement (async) user initiated onlining and offlining a node
func (c *clusterState) HasOnlineNode(nodeId cc.NodeId) bool {
_, ok := c.nodes[nodeId]
return ok
}
func (c *clusterState) IsOfflined(nodeId cc.NodeId) bool {
_, ok := c.offlinedNodes[nodeId]
return ok
}
func (c *clusterState) OnlineNode(nodeId cc.NodeId) {
log.Infof("Onlining node %s", nodeId)
nodeUpdate := cc.NewUserInitiatedAdd(cc.NewIdNode(string(nodeId)))
c.nodesUpdatesCh <- []cc.NodeUpdate{nodeUpdate}
}
func (c *clusterState) OfflineNode(nodeId cc.NodeId) {
log.Infof("Offlining node %s", nodeId)
nodeUpdate := cc.NewUserInitiatedRemove(nodeId)
c.nodesUpdatesCh <- []cc.NodeUpdate{nodeUpdate}
}