in src/main/scala/com/spotify/bdrc/pipeline/TfIdf.scala [67:99]
def scio(input: Seq[(String, SCollection[String])]): SCollection[Score] = {
val numDocs = input.size
val docToTerms = input
.map { case (doc, pipe) =>
pipe
.flatMap(_.split("\\W+").filter(_.nonEmpty))
.map(t => (doc, t.toLowerCase))
}
// union input collections
.reduce(_ ++ _) // (d, t)
val docToTermAndCFreq = docToTerms
// equivalent to .countByValue but returns RDD instead of Map
.map((_, 1L))
.reduceByKey(_ + _)
.map { case ((d, t), tf) => (d, (t, tf)) }
val termToDfN = docToTerms.distinct.values
// equivalent to .countByValue but returns RDD instead of Map
.map((_, 1L))
.reduceByKey(_ + _) // (t, df)
.mapValues(_.toDouble / numDocs) // (t, df/N)
docToTerms.keys
// equivalent to .countByValue but returns RDD instead of Map
.map((_, 1L))
.reduceByKey(_ + _) // (d, |d|)
.join(docToTermAndCFreq)
.map { case (d, (dLen, (t, tf))) => (t, (d, tf.toDouble / dLen)) } // (t, (d, tf/|d|))
.join(termToDfN)
.map { case (t, ((d, tfd), dfN)) => Score(t, d, tfd * math.log(1 / dfN)) }
}