in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/ParquetSampler.scala [45:84]
override def sample(n: Long, head: Boolean): Seq[GenericRecord] = {
require(n > 0, "n must be > 0")
logger.info("Taking a sample of {} from Parquet {}", n, path)
FileSystems.setDefaultPipelineOptions(conf.getOrElse(PipelineOptionsFactory.create()))
val matches = FileSystems.`match`(path).metadata().asScala
if (!FileSystems.hasGlobWildcard(path)) {
val resource = matches.head.resourceId()
new ParquetFileSampler(resource, seed).sample(n, head)
} else {
if (head) {
val resources = matches
.map(_.resourceId())
.sortBy(_.toString)
// read from the start
val result = ListBuffer.empty[GenericRecord]
val iter = resources.toIterator
while (result.size < n && iter.hasNext) {
result.appendAll(new ParquetFileSampler(iter.next()).sample(n, head))
}
result.toList
} else {
val tups = matches
.map(md => (md.resourceId(), md.sizeBytes()))
.sortBy(_._1.toString)
.toArray
// randomly sample from shards
val sizes = tups.map(_._2)
val resources = tups.map(_._1)
val samples = scaleWeights(sizes, n)
val futures = resources
.zip(samples)
.map { case (r, s) =>
Future(new ParquetFileSampler(r).sample(s, head))
}
.toSeq
Await.result(Future.sequence(futures), Duration.Inf).flatten
}
}
}