def sparkActions()

in src/main/scala/com/spotify/bdrc/pipeline/TfIdf.scala [140:175]


  def sparkActions(input: Seq[(String, RDD[String])]): Seq[Score] = {
    val numDocs = input.size

    val docToTerms = input
      .map { case (doc, pipe) =>
        pipe
          .flatMap(_.split("\\W+").filter(_.nonEmpty))
          .map(t => (doc, t.toLowerCase))
      }
      .reduce(_ ++ _) // (d, t)
      .cache() // docToTerms is reused 3 times

    val docToTermAndCFreq = docToTerms
      .countByValue()
      // performed on driver node
      .map { case ((d, t), tf) => (d, (t, tf)) }

    val termToDfN = docToTerms
      .distinct()
      .values
      .countByValue() // (t, df)
      // performed on driver node
      .mapValues(_.toDouble / numDocs) // (t, df/N)

    docToTerms.keys
      .countByValue() // (d, |d|)
      // performed on driver node
      .toSeq
      .map { case (d, dLen) =>
        val (t, tf) = docToTermAndCFreq(d)
        //(t, (d, tf.toDouble / dLen))  // (t, (d, tf/|d|))
        val tfd = tf.toDouble / dLen
        val dfN = termToDfN(t)
        Score(t, d, tfd * math.log(1 / dfN))
      }
  }