def processElement()

in scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala [160:241]


  def processElement(
    @Element file: ReadableFile,
    tracker: RestrictionTracker[OffsetRange, Long],
    out: DoFn.OutputReceiver[R]
  ): Unit = {
    logger.debug(
      "reading file from offset {} to {}",
      tracker.currentRestriction.getFrom,
      if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo
    )
    val reader = parquetFileReader(file)
    try {
      val filter = options.getRecordFilter
      val parquetFileMetadata = reader.getFooter.getFileMetaData
      val fileSchema = parquetFileMetadata.getSchema
      val fileMetadata = parquetFileMetadata.getKeyValueMetaData
      val readSupport = readSupportFactory.readSupport

      val readContext = readSupport.init(
        new InitContext(
          conf.get(),
          fileMetadata.asScala.map { case (k, v) =>
            k -> (ImmutableSet.of(v): JSet[String])
          }.asJava,
          fileSchema
        )
      )
      reader.setRequestedSchema(readContext.getRequestedSchema)

      val columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy)
      val recordConverter =
        readSupport.prepareForRead(conf.get(), fileMetadata, fileSchema, readContext)
      val columnIO = columnIOFactory.getColumnIO(readContext.getRequestedSchema, fileSchema, true)

      splitGranularity match {
        case SplitGranularity.File =>
          val tryClaim = tracker.tryClaim(tracker.currentRestriction().getFrom)
          var pages = filterGranularity.readNextRowGroup(reader)
          // Must check tryClaim before reading so work isn't duplicated across workers
          while (tryClaim && pages != null) {
            val recordReader = columnIO.getRecordReader(
              pages,
              recordConverter,
              if (options.useRecordFilter) filter else FilterCompat.NOOP
            )
            readRowGroup(
              0,
              pages.getRowCount,
              file,
              recordReader,
              out,
              projectionFn
            )
            pages = filterGranularity.readNextRowGroup(reader)
          }
        case SplitGranularity.RowGroup =>
          var currentRowGroupIndex = tracker.currentRestriction.getFrom
          (0L until currentRowGroupIndex).foreach(_ => reader.skipNextRowGroup())
          while (tracker.tryClaim(currentRowGroupIndex)) {
            val pages = filterGranularity.readNextRowGroup(reader)

            val recordReader = columnIO.getRecordReader(
              pages,
              recordConverter,
              if (options.useRecordFilter) filter else FilterCompat.NOOP
            )
            readRowGroup(
              currentRowGroupIndex,
              pages.getRowCount,
              file,
              recordReader,
              out,
              projectionFn
            )

            currentRowGroupIndex += 1
          }
      }
    } finally {
      reader.close()
    }
  }