in scio-parquet/src/main/scala/com/spotify/scio/parquet/read/ParquetReadFn.scala [160:241]
def processElement(
@Element file: ReadableFile,
tracker: RestrictionTracker[OffsetRange, Long],
out: DoFn.OutputReceiver[R]
): Unit = {
logger.debug(
"reading file from offset {} to {}",
tracker.currentRestriction.getFrom,
if (splitGranularity == SplitGranularity.File) "end" else tracker.currentRestriction().getTo
)
val reader = parquetFileReader(file)
try {
val filter = options.getRecordFilter
val parquetFileMetadata = reader.getFooter.getFileMetaData
val fileSchema = parquetFileMetadata.getSchema
val fileMetadata = parquetFileMetadata.getKeyValueMetaData
val readSupport = readSupportFactory.readSupport
val readContext = readSupport.init(
new InitContext(
conf.get(),
fileMetadata.asScala.map { case (k, v) =>
k -> (ImmutableSet.of(v): JSet[String])
}.asJava,
fileSchema
)
)
reader.setRequestedSchema(readContext.getRequestedSchema)
val columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy)
val recordConverter =
readSupport.prepareForRead(conf.get(), fileMetadata, fileSchema, readContext)
val columnIO = columnIOFactory.getColumnIO(readContext.getRequestedSchema, fileSchema, true)
splitGranularity match {
case SplitGranularity.File =>
val tryClaim = tracker.tryClaim(tracker.currentRestriction().getFrom)
var pages = filterGranularity.readNextRowGroup(reader)
// Must check tryClaim before reading so work isn't duplicated across workers
while (tryClaim && pages != null) {
val recordReader = columnIO.getRecordReader(
pages,
recordConverter,
if (options.useRecordFilter) filter else FilterCompat.NOOP
)
readRowGroup(
0,
pages.getRowCount,
file,
recordReader,
out,
projectionFn
)
pages = filterGranularity.readNextRowGroup(reader)
}
case SplitGranularity.RowGroup =>
var currentRowGroupIndex = tracker.currentRestriction.getFrom
(0L until currentRowGroupIndex).foreach(_ => reader.skipNextRowGroup())
while (tracker.tryClaim(currentRowGroupIndex)) {
val pages = filterGranularity.readNextRowGroup(reader)
val recordReader = columnIO.getRecordReader(
pages,
recordConverter,
if (options.useRecordFilter) filter else FilterCompat.NOOP
)
readRowGroup(
currentRowGroupIndex,
pages.getRowCount,
file,
recordReader,
out,
projectionFn
)
currentRowGroupIndex += 1
}
}
} finally {
reader.close()
}
}