in pkg/scheduler/util/predicate_helper.go [23:100]
func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn) ([]*api.NodeInfo, *api.FitErrors) {
var errorLock sync.RWMutex
fe := api.NewFitErrors()
allNodes := len(nodes)
if allNodes == 0 {
return make([]*api.NodeInfo, 0), fe
}
numNodesToFind := CalculateNumOfFeasibleNodesToFind(int32(allNodes))
//allocate enough space to avoid growing it
predicateNodes := make([]*api.NodeInfo, numNodesToFind)
numFoundNodes := int32(0)
processedNodes := int32(0)
taskGroupid := taskGroupID(task)
nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid]
if nodeErrorCache == nil {
nodeErrorCache = map[string]error{}
}
//create a context with cancellation
ctx, cancel := context.WithCancel(context.Background())
checkNode := func(index int) {
// Check the nodes starting from where is left off in the previous scheduling cycle,
// to make sure all nodes have the same chance of being examined across pods.
node := nodes[(lastProcessedNodeIndex+index)%allNodes]
atomic.AddInt32(&processedNodes, 1)
klog.V(4).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
// Check if the task had "predicate" failure before.
// And then check if the task failed to predict on this node before.
if taskFailedBefore {
errorLock.RLock()
errC, ok := nodeErrorCache[node.Name]
errorLock.RUnlock()
if ok {
errorLock.Lock()
fe.SetNodeError(node.Name, errC)
errorLock.Unlock()
return
}
}
// TODO (k82cn): Enable eCache for performance improvement.
if err := fn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
errorLock.Lock()
nodeErrorCache[node.Name] = err
ph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache
fe.SetNodeError(node.Name, err)
errorLock.Unlock()
return
}
//check if the number of found nodes is more than the numNodesTofind
length := atomic.AddInt32(&numFoundNodes, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&numFoundNodes, -1)
} else {
predicateNodes[length-1] = node
}
}
//workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), checkNode)
workqueue.ParallelizeUntil(ctx, 16, allNodes, checkNode)
//processedNodes := int(numFoundNodes) + len(filteredNodesStatuses) + len(failedPredicateMap)
lastProcessedNodeIndex = (lastProcessedNodeIndex + int(processedNodes)) % allNodes
predicateNodes = predicateNodes[:numFoundNodes]
return predicateNodes, fe
}