in core/src/main/java/com/twitter/elephantbird/mapreduce/input/combine/CompositeRecordReader.java [100:138]
public boolean nextKeyValue() throws IOException, InterruptedException {
// This is essentially tail recursion, but java style
while (true) {
// No record reader, and there are no more record readers. No more KV's
if (currentRecordReader == null && recordReaders.isEmpty()) {
return false;
} else if (currentRecordReader != null) {
// We have a record reader, and it is either done, or we have more values.
if (currentRecordReader.nextKeyValue()) {
return true;
}
currentRecordReader.close();
// Rely on the rest of the loop to get a next currentRecordReader, if there is one available.
currentRecordReader = null;
}
// At this point, there is no currentRecordReader and no more recordReaders. No more KVs.
if (recordReaders.isEmpty()) {
return false;
}
currentRecordReader = recordReaders.remove().createRecordReader();
currentRecordReaderIndex++;
// The DeprecatedInputFormatWrapper has a check in it which ensures that the key and value
// objects of the underlying MapredInputFormatCompatible are the same. Thus, we rely on the
// very first RecordReader that we instantiate to create the underlying objects which we
// will use for the rest of the process.
if (!haveInitializedFirstRecordReader) {
key = currentRecordReader.getCurrentKey();
value = currentRecordReader.getCurrentValue();
haveInitializedFirstRecordReader = true;
} else {
// This call is purely for interop with DeprecatedInputFormatWrapper. It ensures that a pair of
// key value objects which were set by a calling function are passed to the new delegate so that it
// will be consistent the entire time.
setKeyValue(key, value);
}
// We will loop again and see if there is a nextKeyValue
}
}