in src/main/scala/com/spotify/bdrc/pipeline/TfIdf.scala [33:65]
def scalding(input: Seq[(String, TypedPipe[String])]): TypedPipe[Score] = {
val numDocs = input.size
val docToTerms = input
.map { case (doc, pipe) =>
pipe
.flatMap(_.split("\\W+").filter(_.nonEmpty))
.map(t => (doc, t.toLowerCase))
}
// union input collections
.reduce(_ ++ _) // (d, t)
val docToTermAndFreq = docToTerms
.groupBy(identity)
.size
.toTypedPipe
.map { case ((d, t), tf) => (d, (t, tf)) }
val termToDfN = docToTerms.distinct.values
.groupBy(identity)
.size // (t, df)
.mapValues(_.toDouble / numDocs) // (t, df/N)
docToTerms.keys
.groupBy(identity)
.size // (d, |d|)
.join(docToTermAndFreq)
.toTypedPipe
.map { case (d, (dLen, (t, tf))) => (t, (d, tf.toDouble / dLen)) } // (t, (d, tf/|d|))
.join(termToDfN)
.toTypedPipe
.map { case (t, ((d, tfd), dfN)) => Score(t, d, tfd * math.log(1 / dfN)) }
}