def sample[T: ClassTag: Coder, U: ClassTag: Coder]()

in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala [349:435]


  def sample[T: ClassTag: Coder, U: ClassTag: Coder](
    s: SCollection[T],
    fraction: Double,
    fields: Seq[String],
    seed: Option[Int],
    hashAlgorithm: HashAlgorithm,
    distribution: Option[SampleDistribution],
    distributionFields: Seq[String],
    precision: Precision,
    hashFn: (T, String, Hasher) => Hasher,
    keyFn: T => U,
    maxKeySize: Int = 1e6.toInt,
    byteEncoding: ByteEncoding = RawEncoding
  ): SCollection[T] = {
    def assignHashRoll(
      s: SCollection[T],
      seed: Option[Int],
      fields: Seq[String]
    ): SCollection[(U, (T, Double))] = {
      s.map { v =>
        val hasher =
          ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
        val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
        (keyFn(v), (v, boundLong(hash.asLong)))
      }
    }

    @transient lazy val logSerDe = LoggerFactory.getLogger(this.getClass)
    val det = Determinism.fromSeq(fields)

    (det, distribution, precision) match {
      case (NonDeterministic, None, Approximate) => s.sample(withReplacement = false, fraction)

      case (NonDeterministic, Some(d), Approximate) =>
        s.sampleDist(d, keyFn, fraction)

      case (Deterministic, None, Approximate) =>
        s.flatMap { e =>
          val hasher =
            ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
          val hash = fields.foldLeft(hasher)((h, f) => hashFn(e, f, h)).hash
          BigSampler.diceElement(e, hash, fraction)
        }

      case (Deterministic, Some(StratifiedDistribution), Approximate) =>
        val sampled = s
          .flatMap { v =>
            val hasher =
              ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
            val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
            BigSampler.diceElement(v, hash, fraction)
          }
          .keyBy(keyFn(_))

        val sampledDiffs = buildStratifiedDiffs(s, sampled, keyFn, fraction)
        logDistributionDiffs(sampledDiffs, logSerDe)
        sampled.values

      case (Deterministic, Some(UniformDistribution), Approximate) =>
        val (popPerKey, probPerKey) = uniformParams(s, keyFn, fraction)
        val sampled = s
          .keyBy(keyFn(_))
          .hashJoin(probPerKey)
          .flatMap { case (k, (v, prob)) =>
            val hasher =
              ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
            val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
            BigSampler.diceElement(v, hash, prob).map(e => (k, e))
          }

        val sampledDiffs =
          buildUniformDiffs(s, sampled, keyFn, fraction, popPerKey)
        logDistributionDiffs(sampledDiffs, logSerDe)
        sampled.values

      case (NonDeterministic, Some(d), Exact) =>
        assignRandomRoll(s, keyFn)
          .exactSampleDist(d, keyFn, fraction, maxKeySize)

      case (Deterministic, Some(d), Exact) =>
        assignHashRoll(s, seed, fields)
          .exactSampleDist(d, keyFn, fraction, maxKeySize, delta = 1e-6)

      case _ =>
        throw new UnsupportedOperationException("This sampling mode is not currently supported")
    }
  }