public int run()

in src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java [94:156]


  public int run(String[] args) throws Exception {
    if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
      printUsage();
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }

    List<Path> inputPaths = new ArrayList<Path>();
    for (String strPath: args) {
      walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
    }

    if (inputPaths.isEmpty()) {
      LOG.info("No input paths found - perhaps all " +
          ".lzo files have already been indexed.");
      return 0;
    }

    Job job = new Job(getConf());
    setJobName(job, args);

    job.setOutputKeyClass(Path.class);
    job.setOutputValueClass(LongWritable.class);

    // The LzoIndexOutputFormat doesn't currently work with speculative execution.
    // Patches welcome.
    job.getConfiguration().setBoolean(
      "mapred.map.tasks.speculative.execution", false);

    job.setJarByClass(DistributedLzoIndexer.class);
    job.setInputFormatClass(LzoSplitInputFormat.class);
    job.setOutputFormatClass(LzoIndexOutputFormat.class);
    job.setNumReduceTasks(0);
    job.setMapperClass(Mapper.class);

    for (Path p : inputPaths) {
      FileInputFormat.addInputPath(job, p);
    }

    job.submit();

    LOG.info("Started DistributedIndexer " + job.getJobID() + " with " +
        inputPaths.size() + " splits for " + Arrays.toString(args));
    LOG.info("Queue Used: " + job.getConfiguration().get("mapred.job.queue.name"));

    if (job.waitForCompletion(true)) {
      long successfulMappers = CompatibilityUtil.getCounterValue(
          job.getCounters().findCounter(LzoSplitRecordReader.Counters.READ_SUCCESS));

      if (successfulMappers == inputPaths.size()) {
        return 0;
      }

      // some of the mappers failed
      LOG.error("DistributedIndexer " + job.getJobID() + " failed. "
          + (inputPaths.size() - successfulMappers)
          + " out of " + inputPaths.size() + " mappers failed.");
    } else {
      LOG.error("DistributedIndexer job " + job.getJobID() + " failed.");
    }

    return 1; // failure
  }