scheduler/server/task_scheduler.go (146 lines of code) (raw):

package server import ( "math" log "github.com/sirupsen/logrus" "github.com/twitter/scoot/cloud/cluster" "github.com/twitter/scoot/common/stats" ) type taskAssignment struct { nodeSt *nodeState task *taskState } // Clients will check for this string to differentiate between scoot and user initiated actions. const RebalanceRequestedErrStr = "RebalanceRequested" // Returns a list of taskAssigments of task to free node. // Also returns a modified copy of clusterState.nodeGroups for the caller to apply (so this remains a pure fn). // Note: pure fn because it's confusing to have getTaskAssignments() modify clusterState based on the proposed // scheduling and also require that the caller apply final modifications to clusterState as a second step) // // Does best effort scheduling which tries to assign tasks to nodes already primed for similar tasks. // Not all tasks are guaranteed to be scheduled. func (s *statefulScheduler) getTaskAssignments() []taskAssignment { defer s.stat.Latency(stats.SchedTaskAssignmentsLatency_ms).Time().Stop() // Exit if there are no unscheduled tasks. waitingTasksFound := false for _, j := range s.inProgressJobs { if len(j.Tasks)-j.TasksCompleted-j.TasksRunning > 0 { waitingTasksFound = true break } } if !waitingTasksFound { return nil } tasks, stopTasks := s.config.SchedAlg.GetTasksToBeAssigned(s.inProgressJobs, s.stat, s.clusterState, s.requestorMap) // Exit if no tasks qualify to be scheduled. if len(tasks) == 0 { if len(stopTasks) != 0 { log.Errorf("task assignment returned tasks to stop but none to start. Ignoring the (%d len) stopTasks list", len(stopTasks)) } return nil } log.WithFields(log.Fields{"numStartingTasks": len(tasks), "numStoppingTasks": len(stopTasks)}).Info("scheduling returned") // stop the tasks in stopTasks (we are rebalancing the workers) for _, task := range stopTasks { jobState := s.getJob(task.JobId) logFields := log.Fields{ "jobID": task.JobId, "requestor": jobState.Job.Def.Requestor, "jobType": jobState.Job.Def.JobType, "tag": jobState.Job.Def.Tag, } msgs := s.abortTask(jobState, task, logFields, RebalanceRequestedErrStr) if len(msgs) > 0 { if err := jobState.Saga.BulkMessage(msgs); err != nil { logFields["err"] = err log.WithFields(logFields).Error("abortTask saga.BulkMessage failure") } } } // Loop over all cluster snapshotIds looking for a usable node. Prefer, in order: // - Hot node for the given snapshotId (one whose last task shared the same snapshotId). // - New untouched node (or node whose last task used an empty snapshotId) // - A random free node from the idle pools of nodes associated with other snapshotIds. assignments := s.assign(tasks) log.WithFields( log.Fields{ "numAssignments": len(assignments), "numTasks": len(tasks), "tag": tasks[0].Def.Tag, "jobID": tasks[0].Def.JobID, }).Infof("Assigned %d tasks", len(assignments)) return assignments } type nodeStatesByNodeID map[cluster.NodeId]*nodeState // Helper fn, appends to 'assignments' and updates nodeGroups. // Should successfully assign all given tasks if caller invokes this with self-consistent params. // Note: there may be a race condition between recognizing idle nodes as available for assignment and // nodes becoming offlined or suspended. The code does the best it can, but it may assign a task to // a node that clusterState considers offlined/suspended before or as the task is actually being started func (s *statefulScheduler) assign(tasks []*taskState) (assignments []taskAssignment) { idleNodesByGroupIDs := map[string]nodeStatesByNodeID{} // make a local map of groupID to each group's (non-suspended/non-offlined) idle nodes. We use a local // map instead of the clusterState's nodeGroups map because as the processing (findIdleNodeInGroup) finds an idle // node to assign to a task, it removes it from that group's idle nodes. We don't want to have that removal // impact clusterState's idle nodes for the group because clusterState.taskScheduled() also removes the idle // node from the group. // Note: each group is a map of nodeID to the nodeState being maintained in clusterState. If a node goes offline // as the tasks are being assigned, findIdleNodeInGroup() will not assign the node to a task. // (Yes this is wonky, but I don't have a great understanding of clusterState, and since the original // implementation used its own local copy of clusterState's nodeGroups here, I'm following that pattern.) // TODO move assigning tasks to nodes to cluster state to avoid copying clusterState.NodeGroups for groupID, group := range s.clusterState.nodeGroups { if len(group.idle) > 0 { idleNodesByGroupIDs[groupID] = nodeStatesByNodeID{} for _, ns := range group.idle { if !ns.suspended() && !s.clusterState.isOfflined(ns) { idleNodesByGroupIDs[groupID][ns.node.Id()] = ns } } } } for _, task := range tasks { var nodeSt *nodeState // is there a node group (with idle node) for this snapshot? if nodeGroup, ok := idleNodesByGroupIDs[task.Def.SnapshotID]; ok { nodeSt = s.findIdleNodeInGroup(nodeGroup) } if nodeSt == nil { // could not find any free nodes in node group for the task's snapshot id. Look for a free node in the other // node groups, starting with the "" node group if nodeGroup, ok := idleNodesByGroupIDs[""]; ok { nodeSt = s.findIdleNodeInGroup(nodeGroup) } if nodeSt == nil { // no free node was found in "" nor the task's node group, grab an idle node from another group for groupID, nodeGroup := range idleNodesByGroupIDs { if groupID == "" { continue } nodeSt = s.findIdleNodeInGroup(nodeGroup) if nodeSt != nil { break } } } } if nodeSt == nil { // Could not find any more free nodes. This may happen if nodes are suspended after the scheduling algorithm // has built the list of tasks to schedule but before the tasks are actually assigned to a node. // Skip the rest of the assignments, the skipped tasks should be picked up in the next scheduling run log.WithFields( log.Fields{ "jobID": task.JobId, "taskID": task.TaskId, "tag": task.Def.Tag, }).Warn("Unable to assign, no free node for task") break } assignments = append(assignments, taskAssignment{nodeSt: nodeSt, task: task}) // Mark Task as Started in the cluster s.clusterState.taskScheduled(nodeSt.node.Id(), task.JobId, task.Def.TaskID, task.Def.SnapshotID) log.WithFields( log.Fields{ "jobID": task.JobId, "taskID": task.TaskId, "node": nodeSt.node, "numAssignments": len(assignments), "tag": task.Def.Tag, }).Info("Scheduling task") s.stat.Counter(stats.SchedScheduledTasksCounter).Inc(1) } return assignments } // findIdleNodeInGroup finds a node in the group's idle nodes that is not suspended or offlined (this method will, pick // up nodes that have been suspended/offlined while the processing was assigning other tasks to nodes). // It also removes the node from the groups idle nodes list to prevent it from being assigned again. func (s *statefulScheduler) findIdleNodeInGroup(nodeGroup nodeStatesByNodeID) *nodeState { for id, ns := range nodeGroup { if ns.suspended() || s.clusterState.isOfflined(ns) { continue } delete(nodeGroup, id) return ns } return nil } // Helpers. func min(num int, nums ...int) int { m := num for _, n := range nums { if n < m { m = n } } return m } func max(num int, nums ...int) int { m := num for _, n := range nums { if n > m { m = n } } return m } func ceil(num float32) int { return int(math.Ceil(float64(num))) }