in src/main/scala/com/spotify/bdrc/pipeline/TfIdf.scala [140:175]
def sparkActions(input: Seq[(String, RDD[String])]): Seq[Score] = {
val numDocs = input.size
val docToTerms = input
.map { case (doc, pipe) =>
pipe
.flatMap(_.split("\\W+").filter(_.nonEmpty))
.map(t => (doc, t.toLowerCase))
}
.reduce(_ ++ _) // (d, t)
.cache() // docToTerms is reused 3 times
val docToTermAndCFreq = docToTerms
.countByValue()
// performed on driver node
.map { case ((d, t), tf) => (d, (t, tf)) }
val termToDfN = docToTerms
.distinct()
.values
.countByValue() // (t, df)
// performed on driver node
.mapValues(_.toDouble / numDocs) // (t, df/N)
docToTerms.keys
.countByValue() // (d, |d|)
// performed on driver node
.toSeq
.map { case (d, dLen) =>
val (t, tf) = docToTermAndCFreq(d)
//(t, (d, tf.toDouble / dLen)) // (t, (d, tf/|d|))
val tfd = tf.toDouble / dLen
val dfN = termToDfN(t)
Score(t, d, tfd * math.log(1 / dfN))
}
}