in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/BigSampler.scala [349:435]
def sample[T: ClassTag: Coder, U: ClassTag: Coder](
s: SCollection[T],
fraction: Double,
fields: Seq[String],
seed: Option[Int],
hashAlgorithm: HashAlgorithm,
distribution: Option[SampleDistribution],
distributionFields: Seq[String],
precision: Precision,
hashFn: (T, String, Hasher) => Hasher,
keyFn: T => U,
maxKeySize: Int = 1e6.toInt,
byteEncoding: ByteEncoding = RawEncoding
): SCollection[T] = {
def assignHashRoll(
s: SCollection[T],
seed: Option[Int],
fields: Seq[String]
): SCollection[(U, (T, Double))] = {
s.map { v =>
val hasher =
ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
(keyFn(v), (v, boundLong(hash.asLong)))
}
}
@transient lazy val logSerDe = LoggerFactory.getLogger(this.getClass)
val det = Determinism.fromSeq(fields)
(det, distribution, precision) match {
case (NonDeterministic, None, Approximate) => s.sample(withReplacement = false, fraction)
case (NonDeterministic, Some(d), Approximate) =>
s.sampleDist(d, keyFn, fraction)
case (Deterministic, None, Approximate) =>
s.flatMap { e =>
val hasher =
ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
val hash = fields.foldLeft(hasher)((h, f) => hashFn(e, f, h)).hash
BigSampler.diceElement(e, hash, fraction)
}
case (Deterministic, Some(StratifiedDistribution), Approximate) =>
val sampled = s
.flatMap { v =>
val hasher =
ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
BigSampler.diceElement(v, hash, fraction)
}
.keyBy(keyFn(_))
val sampledDiffs = buildStratifiedDiffs(s, sampled, keyFn, fraction)
logDistributionDiffs(sampledDiffs, logSerDe)
sampled.values
case (Deterministic, Some(UniformDistribution), Approximate) =>
val (popPerKey, probPerKey) = uniformParams(s, keyFn, fraction)
val sampled = s
.keyBy(keyFn(_))
.hashJoin(probPerKey)
.flatMap { case (k, (v, prob)) =>
val hasher =
ByteHasher.wrap(BigSampler.hashFun(hashAlgorithm, seed), byteEncoding, utf8Charset)
val hash = fields.foldLeft(hasher)((h, f) => hashFn(v, f, h)).hash
BigSampler.diceElement(v, hash, prob).map(e => (k, e))
}
val sampledDiffs =
buildUniformDiffs(s, sampled, keyFn, fraction, popPerKey)
logDistributionDiffs(sampledDiffs, logSerDe)
sampled.values
case (NonDeterministic, Some(d), Exact) =>
assignRandomRoll(s, keyFn)
.exactSampleDist(d, keyFn, fraction, maxKeySize)
case (Deterministic, Some(d), Exact) =>
assignHashRoll(s, seed, fields)
.exactSampleDist(d, keyFn, fraction, maxKeySize, delta = 1e-6)
case _ =>
throw new UnsupportedOperationException("This sampling mode is not currently supported")
}
}