in scheduler/server/cluster_state.go [244:397]
func (c *clusterState) update(updates []cc.NodeUpdate) {
// Apply updates
adds := 0
removals := 0
for _, update := range updates {
var newNode *nodeState
switch update.UpdateType {
case cc.NodeAdded:
adds += 1
if update.UserInitiated {
log.Infof("NodeAdded: Reinstating offlined node %s", update.Id)
if ns, ok := c.offlinedNodes[update.Id]; ok {
c.nodes[update.Id] = ns
delete(c.offlinedNodes, update.Id)
} else {
log.Errorf("NodeAdded: Unable to reinstate node %s, not present in offlinedNodes", update.Id)
}
} else if ns, ok := c.suspendedNodes[update.Id]; ok {
if !ns.ready() {
// Adding a node that's already suspended as non-ready, leave it in that state until ready.
log.Infof("NodeAdded: Suspended node re-added but still awaiting readiness check %v (%s)", update.Id, ns)
} else if ns.timeLost != nilTime {
// This node was suspended as lost earlier, we can recover it now.
ns.timeLost = nilTime
c.nodes[update.Id] = ns
delete(c.suspendedNodes, update.Id)
log.Infof("NodeAdded: Recovered suspended node %v (%s), %s", update.Id, ns, c.status())
} else {
log.Infof("NodeAdded: Ignoring NodeAdded event for suspended flaky node. %v (%s)", update.Id, ns)
}
} else if ns, ok := c.nodes[update.Id]; !ok {
// This is a new unrecognized node, add it to the cluster, possibly in a suspended state.
if _, ok := c.offlinedNodes[update.Id]; ok {
// the nodes was manually offlined, leave it offlined
log.Infof("NodeAdded: Ignoring NodeAdded event for offlined node: %v (%#v), %s", update.Id, update.Node, c.status())
} else if c.readyFn == nil {
// We're not checking readiness, skip suspended state and add this as a healthy node.
newNode = newNodeState(update.Node)
c.nodes[update.Id] = newNode
newNode.readyCh = nil
log.Infof("NodeAdded: Added new node: %v (%#v), %s", update.Id, update.Node, c.status())
} else {
// Add this to the suspended nodes and start a goroutine to check for readiness.
newNode = newNodeState(update.Node)
c.suspendedNodes[update.Id] = newNode
log.Infof("NodeAdded: Added new suspended node: %v (%#v), %s", update.Id, update.Node, c.status())
newNode.startReadyLoop(c.readyFn)
}
c.nodeGroups[""].idle[update.Id] = newNode
} else {
// This node is already present, log this spurious add.
log.Infof("NodeAdded: Node already added!! %v (%s)", update.Id, ns)
}
case cc.NodeRemoved:
removals += 1
if update.UserInitiated {
log.Infof("NodeRemoved: Offlining node %s", update.Id)
if ns, ok := c.nodes[update.Id]; ok {
c.offlinedNodes[update.Id] = ns
delete(c.nodes, update.Id)
} else if ns, ok := c.suspendedNodes[update.Id]; ok {
c.offlinedNodes[update.Id] = ns
delete(c.suspendedNodes, update.Id)
} else {
log.Errorf("NodeRemoved: Unable to offline node %s, not present in nodes or suspendedNodes", update.Id)
}
} else if ns, ok := c.suspendedNodes[update.Id]; ok {
// Node already suspended, make sure it's now marked as lost and not flaky (keep readiness status intact).
log.Infof("NodeRemoved: Already suspended node marked as removed: %v (was %s)", update.Id, ns)
ns.timeLost = time.Now()
ns.timeFlaky = nilTime
} else if ns, ok := c.nodes[update.Id]; ok {
// This was a healthy node, mark it as lost now.
ns.timeLost = time.Now()
c.suspendedNodes[update.Id] = ns
delete(c.nodes, update.Id)
log.Infof("NodeRemoved: Removing node by marking as lost: %v (%s), %s", update.Id, ns, c.status())
} else if ns, ok := c.offlinedNodes[update.Id]; ok {
// the node was manually offlined, remove it from offlined list
delete(c.offlinedNodes, update.Id)
log.Infof("NodeRemoved: Removing offlined node: %v (%s), %s", update.Id, ns, c.status())
} else {
// We don't know about this node, log spurious remove.
log.Infof("NodeRemoved: Cannot remove unknown node: %v", update.Id)
}
}
}
// record when we see nodes being added removed
// also record how many times we've checked and didn't see any changes
if adds > 0 || removals > 0 {
log.Infof("Number of nodes added: %d\nNumber of nodes removed: %d, (%d cluster updates with no change)", adds, removals, c.nopUpdateCnt)
c.nopUpdateCnt = 0
} else {
c.nopUpdateCnt++
}
// Clean up lost nodes that haven't recovered in time, add flaky nodes back into rotation after some time,
// and check if newly added non-ready nodes are ready to be put into rotation.
now := time.Now()
for _, ns := range c.suspendedNodes {
if ns.ready() {
ns.readyCh = nil
}
if !ns.suspended() {
// This node is initialized, remove it from suspended nodes and add it to the healthy node pool.
// preserve its current snapshotId association
c.nodes[ns.node.Id()] = ns
delete(c.suspendedNodes, ns.node.Id())
log.Infof("SuspendedNode: Node now ready, adding to rotation: %v (%s), %s", ns.node.Id(), ns, c.status())
} else if ns.timeLost != nilTime && time.Since(ns.timeLost) > c.maxLostDuration {
// This node has been missing too long, delete all references to it.
delete(c.suspendedNodes, ns.node.Id())
delete(c.nodeGroups[ns.snapshotId].idle, ns.node.Id())
delete(c.nodeGroups[ns.snapshotId].busy, ns.node.Id())
log.Infof("SuspendedNode: Deleting lost node: %v (%s), %s", ns.node.Id(), ns, c.status())
// Try to notify this node's goroutine about removal so it can stop checking readiness if necessary.
select {
case ns.removedCh <- nil:
default:
}
} else if ns.timeFlaky != nilTime && now.Sub(ns.timeFlaky) > c.maxFlakyDuration {
// This flaky node has been suspended long enough, try adding it back to the healthy node pool.
// We process this using the startReadyLoop/readyFn if present to reapply any side-effects,
// but leave it with its current snapshotId association
//
// TimeFlaky should've been the only time* value set at this point, reset it.
// SnapshotId must be reset since it's used in taskScheduled() and may be gone from nodeGroups.
ns.timeFlaky = nilTime
ns.snapshotId = ""
if c.readyFn != nil {
log.Infof("SuspendedNode: Reinstating flaky node momentarily: %v (%s), %s", ns.node.Id(), ns, c.status())
ns.startReadyLoop(c.readyFn)
} else {
log.Infof("SuspendedNode: Reinstating flaky node now: %v (%s), %s", ns.node.Id(), ns, c.status())
delete(c.suspendedNodes, ns.node.Id())
c.nodes[ns.node.Id()] = ns
}
}
}
if adds > 0 || removals > 0 {
log.Infof("Number of nodes added: %d\nNumber of nodes removed: %d, num iterations without change: %d. %s", adds, removals, c.nopUpdateCnt, c.status())
c.nopUpdateCnt = 0
} else {
c.nopUpdateCnt++
}
c.stats.Gauge(stats.ClusterAvailableNodes).Update(int64(len(c.nodes)))
c.stats.Gauge(stats.ClusterFreeNodes).Update(int64(c.numFree()))
c.stats.Gauge(stats.ClusterRunningNodes).Update(int64(c.numRunning))
c.stats.Gauge(stats.ClusterLostNodes).Update(int64(len(c.suspendedNodes)))
}