in rcfile/src/main/java/com/twitter/elephantbird/mapreduce/input/RCFileThriftInputFormat.java [95:138]
public void initialize(InputSplit split, TaskAttemptContext ctx)
throws IOException, InterruptedException {
// set up columns that needs to read from the RCFile.
tDesc = TStructDescriptor.getInstance(typeRef.getRawClass());
thriftWritable = ThriftWritable.newInstance((Class<TBase<?, ?>>)typeRef.getRawClass());
final List<Field> tFields = tDesc.getFields();
FileSplit fsplit = (FileSplit)split;
Path file = fsplit.getPath();
LOG.info(String.format("reading %s from %s:%d:%d"
, typeRef.getRawClass().getName()
, file.toString()
, fsplit.getStart()
, fsplit.getStart() + fsplit.getLength()));
Configuration conf = HadoopCompat.getConfiguration(ctx);
ColumnarMetadata storedInfo = RCFileUtil.readMetadata(conf, file);
// list of field numbers
List<Integer> tFieldIds = Lists.transform(tFields,
new Function<Field, Integer>() {
public Integer apply(Field fd) {
return Integer.valueOf(fd.getFieldId());
}
});
columnsBeingRead = RCFileUtil.findColumnsToRead(conf, tFieldIds, storedInfo);
for(int idx : columnsBeingRead) {
int fid = storedInfo.getFieldId(idx);
if (fid >= 0) {
knownRequiredFields.add(tFields.get(tFieldIds.indexOf(fid)));
} else {
readUnknownsColumn = true;
}
}
ColumnProjectionUtils.setReadColumnIDs(conf, columnsBeingRead);
// finally!
super.initialize(split, ctx);
}