in rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileProtobufInputFormat.java [93:145]
public void initialize(InputSplit split, TaskAttemptContext ctx)
throws IOException, InterruptedException {
/* set up columns that needs to be read from the RCFile.
* if any of the required fields is not among stored columns,
* read the the "unknowns" column (the last column).
*/
msgBuilder = Protobufs.getMessageBuilder(typeRef.getRawClass());
msgInstance = msgBuilder.getDefaultInstanceForType();
protoWritable = ProtobufWritable.newInstance(typeRef.getRawClass());
Descriptor msgDesc = msgBuilder.getDescriptorForType();
final List<FieldDescriptor> msgFields = msgDesc.getFields();
// set up conf to read all the columns
Configuration conf = new Configuration(HadoopCompat.getConfiguration(ctx));
ColumnProjectionUtils.setFullyReadColumns(conf);
FileSplit fsplit = (FileSplit)split;
Path file = fsplit.getPath();
LOG.info(String.format("reading %s from %s:%d:%d"
, typeRef.getRawClass().getName()
, file.toString()
, fsplit.getStart()
, fsplit.getStart() + fsplit.getLength()));
ColumnarMetadata storedInfo = RCFileUtil.readMetadata(conf, file);
// list of field numbers
List<Integer> msgFieldIds = Lists.transform(msgFields,
new Function<FieldDescriptor, Integer>() {
public Integer apply(FieldDescriptor fd) {
return fd.getNumber();
}
});
columnsBeingRead = RCFileUtil.findColumnsToRead(conf, msgFieldIds, storedInfo);
for(int idx : columnsBeingRead) {
int fid = storedInfo.getFieldId(idx);
if (fid >= 0) {
knownRequiredFields.add(msgFields.get(msgFieldIds.indexOf(fid)));
} else {
readUnknownsColumn = true;
}
}
ColumnProjectionUtils.setReadColumnIDs(conf, columnsBeingRead);
// finally!
super.initialize(split, ctx);
}