def scio()

in src/main/scala/com/spotify/bdrc/pipeline/TfIdf.scala [67:99]


  def scio(input: Seq[(String, SCollection[String])]): SCollection[Score] = {
    val numDocs = input.size

    val docToTerms = input
      .map { case (doc, pipe) =>
        pipe
          .flatMap(_.split("\\W+").filter(_.nonEmpty))
          .map(t => (doc, t.toLowerCase))
      }
      // union input collections
      .reduce(_ ++ _) // (d, t)

    val docToTermAndCFreq = docToTerms
      // equivalent to .countByValue but returns RDD instead of Map
      .map((_, 1L))
      .reduceByKey(_ + _)
      .map { case ((d, t), tf) => (d, (t, tf)) }

    val termToDfN = docToTerms.distinct.values
      // equivalent to .countByValue but returns RDD instead of Map
      .map((_, 1L))
      .reduceByKey(_ + _) // (t, df)
      .mapValues(_.toDouble / numDocs) // (t, df/N)

    docToTerms.keys
      // equivalent to .countByValue but returns RDD instead of Map
      .map((_, 1L))
      .reduceByKey(_ + _) // (d, |d|)
      .join(docToTermAndCFreq)
      .map { case (d, (dLen, (t, tf))) => (t, (d, tf.toDouble / dLen)) } // (t, (d, tf/|d|))
      .join(termToDfN)
      .map { case (t, ((d, tfd), dfN)) => Score(t, d, tfd * math.log(1 / dfN)) }
  }