in src/main/java/com/hadoop/mapreduce/LzoSplitRecordReader.java [81:142]
public boolean nextKeyValue() throws IOException {
try {
int uncompressedBlockSize = rawInputStream.readInt();
if (uncompressedBlockSize == 0) {
// An uncompressed block size of zero means end of file.
if (readSuccessCounter != null) {
CompatibilityUtil.incrementCounter(readSuccessCounter, 1);
}
return false;
} else if (uncompressedBlockSize < 0) {
throw new EOFException("Could not read uncompressed block size at position " +
rawInputStream.getPos() + " in file " + lzoFile);
}
int compressedBlockSize = rawInputStream.readInt();
if (compressedBlockSize <= 0) {
throw new EOFException("Could not read compressed block size at position " +
rawInputStream.getPos() + " in file " + lzoFile);
}
// See LzopInputStream.getCompressedData
boolean isUncompressedBlock = (uncompressedBlockSize == compressedBlockSize);
int numChecksumsToSkip = isUncompressedBlock ?
numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums;
// Get the current position. Since we've read two ints, the current block started 8 bytes ago.
long pos = rawInputStream.getPos();
curValue.set(pos - 8);
// Seek beyond the checksums and beyond the block data to the beginning of the next block.
long nextBlockOffset = pos + compressedBlockSize + (4 * numChecksumsToSkip);
if (nextBlockOffset < totalFileSize) {
rawInputStream.seek(nextBlockOffset);
} else { // truncated
throw new EOFException("truncated");
}
++numBlocksRead;
// Log some progress every so often.
if (numBlocksRead % LOG_EVERY_N_BLOCKS == 0) {
LOG.info("Reading block " + numBlocksRead + " at pos " + pos + " of " + totalFileSize + ". Read is " +
(100.0 * getProgress()) + "% done. ");
}
return true;
} catch (EOFException e) {
// An EOF is ok. Mostly this is a truncated file wihtout proper lzop footer.
// storing the index till the last lzo block present is the right thing to do.
LOG.info("Received an EOFException. Mostly a truncated file, which is ok : ", e);
if (readSuccessCounter != null) {
CompatibilityUtil.incrementCounter(readSuccessCounter, 1);
}
return false;
} catch (IOException e) {
LOG.warn("Exception while trying to read from " + lzoFile, e);
return false;
}
}