def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/TfIdf.scala [33:65]


  def scalding(input: Seq[(String, TypedPipe[String])]): TypedPipe[Score] = {
    val numDocs = input.size

    val docToTerms = input
      .map { case (doc, pipe) =>
        pipe
          .flatMap(_.split("\\W+").filter(_.nonEmpty))
          .map(t => (doc, t.toLowerCase))
      }
      // union input collections
      .reduce(_ ++ _) // (d, t)

    val docToTermAndFreq = docToTerms
      .groupBy(identity)
      .size
      .toTypedPipe
      .map { case ((d, t), tf) => (d, (t, tf)) }

    val termToDfN = docToTerms.distinct.values
      .groupBy(identity)
      .size // (t, df)
      .mapValues(_.toDouble / numDocs) // (t, df/N)

    docToTerms.keys
      .groupBy(identity)
      .size // (d, |d|)
      .join(docToTermAndFreq)
      .toTypedPipe
      .map { case (d, (dLen, (t, tf))) => (t, (d, tf.toDouble / dLen)) } // (t, (d, tf/|d|))
      .join(termToDfN)
      .toTypedPipe
      .map { case (t, ((d, tfd), dfN)) => Score(t, d, tfd * math.log(1 / dfN)) }
  }