func()

in scheduler/server/cluster_state.go [244:397]


func (c *clusterState) update(updates []cc.NodeUpdate) {
	// Apply updates
	adds := 0
	removals := 0
	for _, update := range updates {
		var newNode *nodeState
		switch update.UpdateType {
		case cc.NodeAdded:
			adds += 1
			if update.UserInitiated {
				log.Infof("NodeAdded: Reinstating offlined node %s", update.Id)
				if ns, ok := c.offlinedNodes[update.Id]; ok {
					c.nodes[update.Id] = ns
					delete(c.offlinedNodes, update.Id)
				} else {
					log.Errorf("NodeAdded: Unable to reinstate node %s, not present in offlinedNodes", update.Id)
				}
			} else if ns, ok := c.suspendedNodes[update.Id]; ok {
				if !ns.ready() {
					// Adding a node that's already suspended as non-ready, leave it in that state until ready.
					log.Infof("NodeAdded: Suspended node re-added but still awaiting readiness check %v (%s)", update.Id, ns)
				} else if ns.timeLost != nilTime {
					// This node was suspended as lost earlier, we can recover it now.
					ns.timeLost = nilTime
					c.nodes[update.Id] = ns
					delete(c.suspendedNodes, update.Id)
					log.Infof("NodeAdded: Recovered suspended node %v (%s), %s", update.Id, ns, c.status())
				} else {
					log.Infof("NodeAdded: Ignoring NodeAdded event for suspended flaky node. %v (%s)", update.Id, ns)
				}
			} else if ns, ok := c.nodes[update.Id]; !ok {
				// This is a new unrecognized node, add it to the cluster, possibly in a suspended state.
				if _, ok := c.offlinedNodes[update.Id]; ok {
					// the nodes was manually offlined, leave it offlined
					log.Infof("NodeAdded: Ignoring NodeAdded event for offlined node: %v (%#v), %s", update.Id, update.Node, c.status())
				} else if c.readyFn == nil {
					// We're not checking readiness, skip suspended state and add this as a healthy node.
					newNode = newNodeState(update.Node)
					c.nodes[update.Id] = newNode
					newNode.readyCh = nil
					log.Infof("NodeAdded: Added new node: %v (%#v), %s", update.Id, update.Node, c.status())
				} else {
					// Add this to the suspended nodes and start a goroutine to check for readiness.
					newNode = newNodeState(update.Node)
					c.suspendedNodes[update.Id] = newNode
					log.Infof("NodeAdded: Added new suspended node: %v (%#v), %s", update.Id, update.Node, c.status())
					newNode.startReadyLoop(c.readyFn)
				}
				c.nodeGroups[""].idle[update.Id] = newNode
			} else {
				// This node is already present, log this spurious add.
				log.Infof("NodeAdded: Node already added!! %v (%s)", update.Id, ns)
			}

		case cc.NodeRemoved:
			removals += 1
			if update.UserInitiated {
				log.Infof("NodeRemoved: Offlining node %s", update.Id)
				if ns, ok := c.nodes[update.Id]; ok {
					c.offlinedNodes[update.Id] = ns
					delete(c.nodes, update.Id)
				} else if ns, ok := c.suspendedNodes[update.Id]; ok {
					c.offlinedNodes[update.Id] = ns
					delete(c.suspendedNodes, update.Id)
				} else {
					log.Errorf("NodeRemoved: Unable to offline node %s, not present in nodes or suspendedNodes", update.Id)
				}
			} else if ns, ok := c.suspendedNodes[update.Id]; ok {
				// Node already suspended, make sure it's now marked as lost and not flaky (keep readiness status intact).
				log.Infof("NodeRemoved: Already suspended node marked as removed: %v (was %s)", update.Id, ns)
				ns.timeLost = time.Now()
				ns.timeFlaky = nilTime
			} else if ns, ok := c.nodes[update.Id]; ok {
				// This was a healthy node, mark it as lost now.
				ns.timeLost = time.Now()
				c.suspendedNodes[update.Id] = ns
				delete(c.nodes, update.Id)
				log.Infof("NodeRemoved: Removing node by marking as lost: %v (%s), %s", update.Id, ns, c.status())
			} else if ns, ok := c.offlinedNodes[update.Id]; ok {
				// the node was manually offlined, remove it from offlined list
				delete(c.offlinedNodes, update.Id)
				log.Infof("NodeRemoved: Removing offlined node: %v (%s), %s", update.Id, ns, c.status())
			} else {
				// We don't know about this node, log spurious remove.
				log.Infof("NodeRemoved: Cannot remove unknown node: %v", update.Id)
			}
		}
	}

	// record when we see nodes being added removed
	// also record how many times we've checked and didn't see any changes
	if adds > 0 || removals > 0 {
		log.Infof("Number of nodes added: %d\nNumber of nodes removed: %d, (%d cluster updates with no change)", adds, removals, c.nopUpdateCnt)
		c.nopUpdateCnt = 0
	} else {
		c.nopUpdateCnt++
	}

	// Clean up lost nodes that haven't recovered in time, add flaky nodes back into rotation after some time,
	// and check if newly added non-ready nodes are ready to be put into rotation.
	now := time.Now()
	for _, ns := range c.suspendedNodes {
		if ns.ready() {
			ns.readyCh = nil
		}
		if !ns.suspended() {
			// This node is initialized, remove it from suspended nodes and add it to the healthy node pool.
			// preserve its current snapshotId association
			c.nodes[ns.node.Id()] = ns
			delete(c.suspendedNodes, ns.node.Id())
			log.Infof("SuspendedNode: Node now ready, adding to rotation: %v (%s), %s", ns.node.Id(), ns, c.status())
		} else if ns.timeLost != nilTime && time.Since(ns.timeLost) > c.maxLostDuration {
			// This node has been missing too long, delete all references to it.
			delete(c.suspendedNodes, ns.node.Id())
			delete(c.nodeGroups[ns.snapshotId].idle, ns.node.Id())
			delete(c.nodeGroups[ns.snapshotId].busy, ns.node.Id())
			log.Infof("SuspendedNode: Deleting lost node: %v (%s), %s", ns.node.Id(), ns, c.status())
			// Try to notify this node's goroutine about removal so it can stop checking readiness if necessary.
			select {
			case ns.removedCh <- nil:
			default:
			}
		} else if ns.timeFlaky != nilTime && now.Sub(ns.timeFlaky) > c.maxFlakyDuration {
			// This flaky node has been suspended long enough, try adding it back to the healthy node pool.
			// We process this using the startReadyLoop/readyFn if present to reapply any side-effects,
			// but leave it with its current snapshotId association
			//
			// TimeFlaky should've been the only time* value set at this point, reset it.
			// SnapshotId must be reset since it's used in taskScheduled() and may be gone from nodeGroups.
			ns.timeFlaky = nilTime
			ns.snapshotId = ""
			if c.readyFn != nil {
				log.Infof("SuspendedNode: Reinstating flaky node momentarily: %v (%s), %s", ns.node.Id(), ns, c.status())
				ns.startReadyLoop(c.readyFn)
			} else {
				log.Infof("SuspendedNode:  Reinstating flaky node now: %v (%s), %s", ns.node.Id(), ns, c.status())
				delete(c.suspendedNodes, ns.node.Id())
				c.nodes[ns.node.Id()] = ns
			}
		}
	}

	if adds > 0 || removals > 0 {
		log.Infof("Number of nodes added: %d\nNumber of nodes removed: %d, num iterations without change: %d. %s", adds, removals, c.nopUpdateCnt, c.status())
		c.nopUpdateCnt = 0
	} else {
		c.nopUpdateCnt++
	}

	c.stats.Gauge(stats.ClusterAvailableNodes).Update(int64(len(c.nodes)))
	c.stats.Gauge(stats.ClusterFreeNodes).Update(int64(c.numFree()))
	c.stats.Gauge(stats.ClusterRunningNodes).Update(int64(c.numRunning))
	c.stats.Gauge(stats.ClusterLostNodes).Update(int64(len(c.suspendedNodes)))
}