in src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java [100:143]
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = super.getSplits(job);
Configuration conf = CompatibilityUtil.getConfiguration(job);
// find new start/ends of the filesplit that aligns
// with the lzo blocks
List<InputSplit> result = new ArrayList<InputSplit>();
for (InputSplit genericSplit : splits) {
FileSplit fileSplit = (FileSplit) genericSplit;
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
// non-LZO file, keep the input split as is.
result.add(fileSplit);
continue;
}
// LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
}
if (index.isEmpty()) {
// empty index, keep as is
result.add(fileSplit);
continue;
}
long start = fileSplit.getStart();
long end = start + fileSplit.getLength();
long lzoStart = index.alignSliceStartToIndex(start, end);
long lzoEnd = index.alignSliceEndToIndex(end, fs.getFileStatus(file).getLen());
if (lzoStart != LzoIndex.NOT_FOUND && lzoEnd != LzoIndex.NOT_FOUND) {
result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations()));
}
}
return result;
}