in core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java [153:222]
public void initialize(InputSplit split, final TaskAttemptContext context)
throws IOException, InterruptedException {
org.apache.hadoop.mapred.InputSplit oldSplit;
if (split.getClass() == FileSplit.class) {
oldSplit = new org.apache.hadoop.mapred.FileSplit(
((FileSplit)split).getPath(),
((FileSplit)split).getStart(),
((FileSplit)split).getLength(),
split.getLocations());
} else {
oldSplit = ((InputSplitWrapper)split).realSplit;
}
@SuppressWarnings("unchecked")
Reporter reporter = new Reporter() { // Reporter interface over ctx
final TaskInputOutputContext ioCtx =
context instanceof TaskInputOutputContext ?
(TaskInputOutputContext) context : null;
public void progress() { HadoopCompat.progress(context); }
// @Override
public float getProgress() {
return (ioCtx != null) ? ioCtx.getProgress() : 0;
}
public void setStatus(String status) {
if (ioCtx != null)
HadoopCompat.setStatus(ioCtx, status);
}
public void incrCounter(String group, String counter, long amount) {
if (ioCtx != null)
HadoopCompat.incrementCounter(ioCtx.getCounter(group, counter), amount);
}
@SuppressWarnings("unchecked")
public void incrCounter(Enum<?> key, long amount) {
if (ioCtx != null)
HadoopCompat.incrementCounter(ioCtx.getCounter(key), amount);
}
public org.apache.hadoop.mapred.InputSplit getInputSplit()
throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
public Counter getCounter(String group, String name) {
return ioCtx != null ?
(Counter) HadoopCompat.getCounter(ioCtx, group, name) : null;
}
@SuppressWarnings("unchecked")
public Counter getCounter(Enum<?> name) {
return ioCtx != null ?
(Counter)ioCtx.getCounter(name) : null;
}
};
realReader = realInputFormat.getRecordReader(
oldSplit,
(JobConf) HadoopCompat.getConfiguration(context),
reporter);
keyObj = realReader.createKey();
valueObj = realReader.createValue();
}