in src/main/java/com/hadoop/mapreduce/LzoSplitRecordReader.java [47:78]
public void initialize(InputSplit genericSplit, TaskAttemptContext taskAttemptContext) throws IOException {
context = taskAttemptContext;
FileSplit fileSplit = (FileSplit)genericSplit;
lzoFile = fileSplit.getPath();
// The LzoSplitInputFormat is not splittable, so the split length is the whole file.
totalFileSize = fileSplit.getLength();
if (taskAttemptContext instanceof TaskInputOutputContext<?, ?, ?, ?>) {
readSuccessCounter = CompatibilityUtil.getCounter(
(TaskInputOutputContext<?, ?, ?, ?>) taskAttemptContext, Counters.READ_SUCCESS);
}
// Jump through some hoops to create the lzo codec.
Configuration conf = CompatibilityUtil.getConfiguration(context);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(lzoFile);
((Configurable)codec).setConf(conf);
LzopDecompressor lzopDecompressor = (LzopDecompressor)codec.createDecompressor();
FileSystem fs = lzoFile.getFileSystem(conf);
rawInputStream = fs.open(lzoFile);
// Creating the LzopInputStream here just reads the lzo header for us, nothing more.
// We do the rest of our input off of the raw stream is.
codec.createInputStream(rawInputStream, lzopDecompressor);
// This must be called AFTER createInputStream is called, because createInputStream
// is what reads the header, which has the checksum information. Otherwise getChecksumsCount
// erroneously returns zero, and all block offsets will be wrong.
numCompressedChecksums = lzopDecompressor.getCompressedChecksumsCount();
numDecompressedChecksums = lzopDecompressor.getDecompressedChecksumsCount();
}