def spark()

in src/main/scala/com/spotify/bdrc/pipeline/PageRank.scala [75:96]


  def spark(input: RDD[(String, String)]): RDD[(String, Double)] = {
    val links = input
      .groupByKey() // (src URL, iterable of dst URL)
      .cache() // links is reused in every iteration
    var ranks = links.mapValues(_ => 1.0) // (src URL, 1.0)

    for (i <- 1 to 10) {
      val contribs = links
        .join(ranks)
        .values
        // re-distribute rank of src URL among collection of dst URLs
        .flatMap { case (urls, rank) =>
          val size = urls.size
          urls.map((_, rank / size))
        }
      ranks = contribs
        .reduceByKey(_ + _)
        .mapValues((1 - dampingFactor) + dampingFactor * _)
    }

    ranks
  }