func()

in scheduler/server/task_scheduler.go [92:171]


func (s *statefulScheduler) assign(tasks []*taskState) (assignments []taskAssignment) {
	idleNodesByGroupIDs := map[string]nodeStatesByNodeID{}

	// make a local map of groupID to each group's (non-suspended/non-offlined) idle nodes.  We use a local
	// map instead of the clusterState's nodeGroups map because as the processing (findIdleNodeInGroup) finds an idle
	// node to assign to a task, it removes it from that group's idle nodes.  We don't want to have that removal
	// impact clusterState's idle nodes for the group because clusterState.taskScheduled() also removes the idle
	// node from the group.
	// Note: each group is a map of nodeID to the nodeState being maintained in clusterState.  If a node goes offline
	// as the tasks are being assigned, findIdleNodeInGroup() will not assign the node to a task.
	// (Yes this is wonky, but I don't have a great understanding of clusterState, and since the original
	// implementation used its own local copy of clusterState's nodeGroups here, I'm following that pattern.)
	// TODO move assigning tasks to nodes to cluster state to avoid copying clusterState.NodeGroups
	for groupID, group := range s.clusterState.nodeGroups {
		if len(group.idle) > 0 {
			idleNodesByGroupIDs[groupID] = nodeStatesByNodeID{}
			for _, ns := range group.idle {
				if !ns.suspended() && !s.clusterState.isOfflined(ns) {
					idleNodesByGroupIDs[groupID][ns.node.Id()] = ns
				}
			}
		}
	}

	for _, task := range tasks {
		var nodeSt *nodeState

		// is there a node group (with idle node) for this snapshot?
		if nodeGroup, ok := idleNodesByGroupIDs[task.Def.SnapshotID]; ok {
			nodeSt = s.findIdleNodeInGroup(nodeGroup)
		}
		if nodeSt == nil {
			// could not find any free nodes in node group for the task's snapshot id.  Look for a free node in the other
			// node groups, starting with the "" node group
			if nodeGroup, ok := idleNodesByGroupIDs[""]; ok {
				nodeSt = s.findIdleNodeInGroup(nodeGroup)
			}
			if nodeSt == nil {
				// no free node was found in "" nor the task's node group, grab an idle node from another group
				for groupID, nodeGroup := range idleNodesByGroupIDs {
					if groupID == "" {
						continue
					}
					nodeSt = s.findIdleNodeInGroup(nodeGroup)
					if nodeSt != nil {
						break
					}
				}
			}
		}

		if nodeSt == nil {
			// Could not find any more free nodes.  This may happen if nodes are suspended after the scheduling algorithm
			// has built the list of tasks to schedule but before the tasks are actually assigned to a node.
			// Skip the rest of the assignments, the skipped tasks should be picked up in the next scheduling run
			log.WithFields(
				log.Fields{
					"jobID":  task.JobId,
					"taskID": task.TaskId,
					"tag":    task.Def.Tag,
				}).Warn("Unable to assign, no free node for task")
			break
		}
		assignments = append(assignments, taskAssignment{nodeSt: nodeSt, task: task})

		// Mark Task as Started in the cluster
		s.clusterState.taskScheduled(nodeSt.node.Id(), task.JobId, task.Def.TaskID, task.Def.SnapshotID)

		log.WithFields(
			log.Fields{
				"jobID":          task.JobId,
				"taskID":         task.TaskId,
				"node":           nodeSt.node,
				"numAssignments": len(assignments),
				"tag":            task.Def.Tag,
			}).Info("Scheduling task")
		s.stat.Counter(stats.SchedScheduledTasksCounter).Inc(1)
	}
	return assignments
}