in core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoBinaryBlockRecordReader.java [126:171]
public boolean nextKeyValue() throws IOException, InterruptedException {
// If we are past the end of the file split, tell the reader not to read any more new blocks.
// Then continue reading until the last of the reader's already-parsed values are used up.
while (true) { // loop to skip over bad records
if (pos_ > end_) {
reader_.markNoMoreNewBlocks();
// Why not pos_ >= end_, stop when we just reach the end?
// we don't know if we have read all the bytes uncompressed in the current lzo block,
// only way to make sure that we have read all of the split is to read till the
// first record that has at least one byte in the next split.
// As a consequence of this, next split reader skips at least one byte
// (i.e. skips either partial or full record at the beginning).
}
byte[] byteArray = reader_.readNextProtoBytes();
if(byteArray == null) {
return false;
}
errorTracker.incRecords();
M decoded = null;
try {
decoded = deserializer_.fromBytes(byteArray);
} catch (DecodeException e) {
errorTracker.incErrors(e);
HadoopCompat.incrementCounter(recordErrorsCounter, 1);
continue;
}
if (updatePosition) {
pos_ = getLzoFilePos();
updatePosition = false;
}
if (decoded != null) {
key_.set(pos_);
value_.set(decoded);
pos_ = getLzoFilePos();
HadoopCompat.incrementCounter(recordsReadCounter, 1);
return true;
} else {
HadoopCompat.incrementCounter(recordsSkippedCounter, 1);
}
}
}