in core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoInputFormat.java [148:211]
private List<InputSplit> getSplitsInternal(JobContext job)
throws IOException {
List<InputSplit> defaultSplits = super.getSplits(job);
// Find new starts and ends of the file splits that align with the lzo blocks.
List<InputSplit> result = new ArrayList<InputSplit>();
Path prevFile = null;
LzoIndex prevIndex = null;
for (InputSplit genericSplit : defaultSplits) {
// Load the index.
FileSplit fileSplit = (FileSplit)genericSplit;
Path file = fileSplit.getPath();
LzoIndex index; // reuse index for files with multiple blocks.
final LzoSplitStatus lzoSplitStatus = splitStatusMap.get(file);
if ( file.equals(prevFile) ) {
index = prevIndex;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached file status " + lzoSplitStatus);
}
final FileStatus indexFileStatus = lzoSplitStatus.lzoIndexFileStatus;
index = indexFileStatus == null
? new LzoIndex()
: LzoIndex.readIndex(
file.getFileSystem(HadoopCompat.getConfiguration(job)), file);
prevFile = file;
prevIndex = index;
}
if (index == null) {
// In listStatus above, a (possibly empty, but non-null) index was put in for every split.
throw new IOException("Index not found for " + file);
}
if (index.isEmpty()) {
// Empty index, so leave the default split.
// split's start position should be 0.
result.add(fileSplit);
continue;
}
long start = fileSplit.getStart();
long end = start + fileSplit.getLength();
long lzoStart = index.alignSliceStartToIndex(start, end);
long lzoEnd = index.alignSliceEndToIndex(end,
lzoSplitStatus.lzoFileStatus.getLen());
if (lzoStart != LzoIndex.NOT_FOUND && lzoEnd != LzoIndex.NOT_FOUND) {
result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations()));
if (LOG.isDebugEnabled()) {
LOG.debug("Added LZO split for " + file + "[start=" + lzoStart + ", length=" + (lzoEnd - lzoStart) + "]");
}
}
// else ignore the data?
// should handle splitting the entire file here so that
// such errors can be handled better.
}
return result;
}