in src/main/scala/com/spotify/bdrc/pipeline/PageRank.scala [75:96]
def spark(input: RDD[(String, String)]): RDD[(String, Double)] = {
val links = input
.groupByKey() // (src URL, iterable of dst URL)
.cache() // links is reused in every iteration
var ranks = links.mapValues(_ => 1.0) // (src URL, 1.0)
for (i <- 1 to 10) {
val contribs = links
.join(ranks)
.values
// re-distribute rank of src URL among collection of dst URLs
.flatMap { case (urls, rank) =>
val size = urls.size
urls.map((_, rank / size))
}
ranks = contribs
.reduceByKey(_ + _)
.mapValues((1 - dampingFactor) + dampingFactor * _)
}
ranks
}