in graphjet-core/src/main/java/com/twitter/graphjet/algorithms/MultiThreadedPageRank.java [132:163]
public int run() {
LongArrayList noOuts = new LongArrayList();
LongIterator iter = nodes.iterator();
while (iter.hasNext()) {
long v = iter.nextLong();
if (graph.getOutDegree(v) == 0) {
noOuts.add(v);
}
}
double dampingAmount = (1.0 - dampingFactor) / nodeCount;
prVector = new AtomicDoubleArray((int) (maxNodeId + 1));
nodes.forEach(v -> prVector.set((int) (long) v, 1.0 / nodeCount));
// We're going to divide the nodes into partitions, one for each thread.
LongArrayList[] nodePartitions = new LongArrayList[threads];
int partitionSize = nodes.size() / threads;
for (int i=0; i<threads; i++) {
int startPos = i * partitionSize;
// The final partition get the rest of the nodes.
int endPos = i == (threads - 1) ? nodes.size() : (i + 1) * partitionSize;
nodePartitions[i] = new LongArrayList(nodes.subList(startPos, endPos));
}
int i = 0;
while (i < this.maxIterations && normL1 > tolerance) {
iterate(dampingAmount, noOuts, nodePartitions);
i++;
}
return i;
}