in src/main/java/com/hadoop/compression/lzo/LzoIndex.java [207:281]
public static void createIndex(FileSystem fs, Path lzoFile)
throws IOException {
Configuration conf = fs.getConf();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(lzoFile);
if (null == codec) {
throw new IOException("Could not find codec for file " + lzoFile +
" - you may need to add the LZO codec to your io.compression.codecs " +
"configuration in core-site.xml");
}
((Configurable) codec).setConf(conf);
FSDataInputStream is = null;
FSDataOutputStream os = null;
Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX);
Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX);
// Track whether an exception was thrown or not, so we know to either
// delete the tmp index file on failure, or rename it to the new index file on success.
boolean indexingSucceeded = false;
try {
is = fs.open(lzoFile);
os = fs.create(tmpOutputFile);
LzopDecompressor decompressor = (LzopDecompressor) codec.createDecompressor();
// Solely for reading the header
codec.createInputStream(is, decompressor);
int numCompressedChecksums = decompressor.getCompressedChecksumsCount();
int numDecompressedChecksums = decompressor.getDecompressedChecksumsCount();
while (true) {
// read and ignore, we just want to get to the next int
int uncompressedBlockSize = is.readInt();
if (uncompressedBlockSize == 0) {
break;
} else if (uncompressedBlockSize < 0) {
throw new EOFException();
}
int compressedBlockSize = is.readInt();
if (compressedBlockSize <= 0) {
throw new IOException("Could not read compressed block size");
}
// See LzopInputStream.getCompressedData
boolean isUncompressedBlock = (uncompressedBlockSize == compressedBlockSize);
int numChecksumsToSkip = isUncompressedBlock ?
numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums;
long pos = is.getPos();
// write the pos of the block start
os.writeLong(pos - 8);
// seek to the start of the next block, skip any checksums
is.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip));
}
// If we're here, indexing was successful.
indexingSucceeded = true;
} finally {
// Close any open streams.
if (is != null) {
is.close();
}
if (os != null) {
os.close();
}
if (!indexingSucceeded) {
// If indexing didn't succeed (i.e. an exception was thrown), clean up after ourselves.
fs.delete(tmpOutputFile, false);
} else {
// Otherwise, rename filename.lzo.index.tmp to filename.lzo.index.
fs.rename(tmpOutputFile, outputFile);
}
}
}