def scalding()

in src/main/scala/com/spotify/bdrc/pipeline/BloomFilterSetDifference.scala [32:39]


  def scalding(lhs: TypedPipe[String], rhs: TypedPipe[String]): TypedPipe[String] = {
    val width = BloomFilter.optimalWidth(1000, 0.01).get
    val numHashes = BloomFilter.optimalNumHashes(1000, width)
    lhs
      .cross(rhs.aggregate(BloomFilterAggregator(numHashes, width)))
      .filter { case (s, bf) => bf.contains(s).isTrue }
      .keys
  }