override def sample()

in ratatool-sampling/src/main/scala/com/spotify/ratatool/samplers/ParquetSampler.scala [45:84]


  override def sample(n: Long, head: Boolean): Seq[GenericRecord] = {
    require(n > 0, "n must be > 0")
    logger.info("Taking a sample of {} from Parquet {}", n, path)

    FileSystems.setDefaultPipelineOptions(conf.getOrElse(PipelineOptionsFactory.create()))
    val matches = FileSystems.`match`(path).metadata().asScala
    if (!FileSystems.hasGlobWildcard(path)) {
      val resource = matches.head.resourceId()
      new ParquetFileSampler(resource, seed).sample(n, head)
    } else {
      if (head) {
        val resources = matches
          .map(_.resourceId())
          .sortBy(_.toString)
        // read from the start
        val result = ListBuffer.empty[GenericRecord]
        val iter = resources.toIterator
        while (result.size < n && iter.hasNext) {
          result.appendAll(new ParquetFileSampler(iter.next()).sample(n, head))
        }
        result.toList
      } else {
        val tups = matches
          .map(md => (md.resourceId(), md.sizeBytes()))
          .sortBy(_._1.toString)
          .toArray
        // randomly sample from shards
        val sizes = tups.map(_._2)
        val resources = tups.map(_._1)
        val samples = scaleWeights(sizes, n)
        val futures = resources
          .zip(samples)
          .map { case (r, s) =>
            Future(new ParquetFileSampler(r).sample(s, head))
          }
          .toSeq
        Await.result(Future.sequence(futures), Duration.Inf).flatten
      }
    }
  }