in src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java [47:78]
private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
try {
FileSystem fs = path.getFileSystem(getConf());
FileStatus fileStatus = fs.getFileStatus(path);
if (fileStatus.isDir()) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
walkPath(childStatus.getPath(), pathFilter, accumulator);
}
} else if (path.toString().endsWith(LZO_EXTENSION)) {
Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
accumulator.add(path);
}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
accumulator.add(path);
}
}
} catch (IOException ioe) {
LOG.warn("Error walking path: " + path, ioe);
}
}