in scheduler/server/task_scheduler.go [92:171]
func (s *statefulScheduler) assign(tasks []*taskState) (assignments []taskAssignment) {
idleNodesByGroupIDs := map[string]nodeStatesByNodeID{}
// make a local map of groupID to each group's (non-suspended/non-offlined) idle nodes. We use a local
// map instead of the clusterState's nodeGroups map because as the processing (findIdleNodeInGroup) finds an idle
// node to assign to a task, it removes it from that group's idle nodes. We don't want to have that removal
// impact clusterState's idle nodes for the group because clusterState.taskScheduled() also removes the idle
// node from the group.
// Note: each group is a map of nodeID to the nodeState being maintained in clusterState. If a node goes offline
// as the tasks are being assigned, findIdleNodeInGroup() will not assign the node to a task.
// (Yes this is wonky, but I don't have a great understanding of clusterState, and since the original
// implementation used its own local copy of clusterState's nodeGroups here, I'm following that pattern.)
// TODO move assigning tasks to nodes to cluster state to avoid copying clusterState.NodeGroups
for groupID, group := range s.clusterState.nodeGroups {
if len(group.idle) > 0 {
idleNodesByGroupIDs[groupID] = nodeStatesByNodeID{}
for _, ns := range group.idle {
if !ns.suspended() && !s.clusterState.isOfflined(ns) {
idleNodesByGroupIDs[groupID][ns.node.Id()] = ns
}
}
}
}
for _, task := range tasks {
var nodeSt *nodeState
// is there a node group (with idle node) for this snapshot?
if nodeGroup, ok := idleNodesByGroupIDs[task.Def.SnapshotID]; ok {
nodeSt = s.findIdleNodeInGroup(nodeGroup)
}
if nodeSt == nil {
// could not find any free nodes in node group for the task's snapshot id. Look for a free node in the other
// node groups, starting with the "" node group
if nodeGroup, ok := idleNodesByGroupIDs[""]; ok {
nodeSt = s.findIdleNodeInGroup(nodeGroup)
}
if nodeSt == nil {
// no free node was found in "" nor the task's node group, grab an idle node from another group
for groupID, nodeGroup := range idleNodesByGroupIDs {
if groupID == "" {
continue
}
nodeSt = s.findIdleNodeInGroup(nodeGroup)
if nodeSt != nil {
break
}
}
}
}
if nodeSt == nil {
// Could not find any more free nodes. This may happen if nodes are suspended after the scheduling algorithm
// has built the list of tasks to schedule but before the tasks are actually assigned to a node.
// Skip the rest of the assignments, the skipped tasks should be picked up in the next scheduling run
log.WithFields(
log.Fields{
"jobID": task.JobId,
"taskID": task.TaskId,
"tag": task.Def.Tag,
}).Warn("Unable to assign, no free node for task")
break
}
assignments = append(assignments, taskAssignment{nodeSt: nodeSt, task: task})
// Mark Task as Started in the cluster
s.clusterState.taskScheduled(nodeSt.node.Id(), task.JobId, task.Def.TaskID, task.Def.SnapshotID)
log.WithFields(
log.Fields{
"jobID": task.JobId,
"taskID": task.TaskId,
"node": nodeSt.node,
"numAssignments": len(assignments),
"tag": task.Def.Tag,
}).Info("Scheduling task")
s.stat.Counter(stats.SchedScheduledTasksCounter).Inc(1)
}
return assignments
}