in src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java [94:156]
public int run(String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
printUsage();
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
List<Path> inputPaths = new ArrayList<Path>();
for (String strPath: args) {
walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
}
if (inputPaths.isEmpty()) {
LOG.info("No input paths found - perhaps all " +
".lzo files have already been indexed.");
return 0;
}
Job job = new Job(getConf());
setJobName(job, args);
job.setOutputKeyClass(Path.class);
job.setOutputValueClass(LongWritable.class);
// The LzoIndexOutputFormat doesn't currently work with speculative execution.
// Patches welcome.
job.getConfiguration().setBoolean(
"mapred.map.tasks.speculative.execution", false);
job.setJarByClass(DistributedLzoIndexer.class);
job.setInputFormatClass(LzoSplitInputFormat.class);
job.setOutputFormatClass(LzoIndexOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapperClass(Mapper.class);
for (Path p : inputPaths) {
FileInputFormat.addInputPath(job, p);
}
job.submit();
LOG.info("Started DistributedIndexer " + job.getJobID() + " with " +
inputPaths.size() + " splits for " + Arrays.toString(args));
LOG.info("Queue Used: " + job.getConfiguration().get("mapred.job.queue.name"));
if (job.waitForCompletion(true)) {
long successfulMappers = CompatibilityUtil.getCounterValue(
job.getCounters().findCounter(LzoSplitRecordReader.Counters.READ_SUCCESS));
if (successfulMappers == inputPaths.size()) {
return 0;
}
// some of the mappers failed
LOG.error("DistributedIndexer " + job.getJobID() + " failed. "
+ (inputPaths.size() - successfulMappers)
+ " out of " + inputPaths.size() + " mappers failed.");
} else {
LOG.error("DistributedIndexer job " + job.getJobID() + " failed.");
}
return 1; // failure
}