private List getSplitsInternal()

in core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoInputFormat.java [148:211]


  private List<InputSplit> getSplitsInternal(JobContext job)
      throws IOException {
    List<InputSplit> defaultSplits = super.getSplits(job);

    // Find new starts and ends of the file splits that align with the lzo blocks.
    List<InputSplit> result = new ArrayList<InputSplit>();

    Path prevFile = null;
    LzoIndex prevIndex = null;

    for (InputSplit genericSplit : defaultSplits) {
      // Load the index.
      FileSplit fileSplit = (FileSplit)genericSplit;
      Path file = fileSplit.getPath();

      LzoIndex index; // reuse index for files with multiple blocks.
      final LzoSplitStatus lzoSplitStatus = splitStatusMap.get(file);
      if ( file.equals(prevFile) ) {
        index = prevIndex;
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Cached file status " + lzoSplitStatus);
        }
        final FileStatus indexFileStatus = lzoSplitStatus.lzoIndexFileStatus;
        index = indexFileStatus == null
            ? new LzoIndex()
            : LzoIndex.readIndex(
                  file.getFileSystem(HadoopCompat.getConfiguration(job)), file);
        prevFile = file;
        prevIndex = index;
      }

      if (index == null) {
        // In listStatus above, a (possibly empty, but non-null) index was put in for every split.
        throw new IOException("Index not found for " + file);
      }

      if (index.isEmpty()) {
        // Empty index, so leave the default split.
        // split's start position should be 0.
        result.add(fileSplit);
        continue;
      }

      long start = fileSplit.getStart();
      long end = start + fileSplit.getLength();

      long lzoStart = index.alignSliceStartToIndex(start, end);
      long lzoEnd = index.alignSliceEndToIndex(end,
          lzoSplitStatus.lzoFileStatus.getLen());

      if (lzoStart != LzoIndex.NOT_FOUND  && lzoEnd != LzoIndex.NOT_FOUND) {
        result.add(new FileSplit(file, lzoStart, lzoEnd - lzoStart, fileSplit.getLocations()));
        if (LOG.isDebugEnabled()) {
          LOG.debug("Added LZO split for " + file + "[start=" + lzoStart + ", length=" + (lzoEnd - lzoStart) + "]");
        }
      }
      // else ignore the data?
      // should handle splitting the entire file here so that
      // such errors can be handled better.
    }

    return result;
  }