
MRJobConfig
 public static fina COMBINE_CLASS_ATTR
 属性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class"
 ————子接口(F4) JobContent
 方法getCombinerClass
 ————子实现类 JobContextImpl
 实现getCombinerClass方法:
 public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
 throws ClassNotFoundException {
 return (Class<? extends Reducer<?,?,?,?>>) 
 conf.getClass(COMBINE_CLASS_ATTR, null);
 }
 因为JobContextImpl是MRJobConfig子类
 所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性
 ————子类Job
 public void setCombinerClass(Class<? extends Reducer> cls
 ) throws IllegalStateException {
 ensureState(JobState.DEFINE);
 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
 }
 因为JobContextImpl是MRJobConfig子类,
 而Job是JobContextImpl的子类
 所以也有COMBINE_CLASS_ATTR属性
 通过setCombinerClass设置了父类MRJobConfig的属性
   MRJobConfig
 ————子接口JobContent
 方法getCombinerClass
 ————子实现类 JobContextImpl
 ————子类 Job
 ————子实现类 TaskAttemptContext
 继承了方法getCombinerClass
  Task    
 $CombinerRunner(Task的内部类)    
 该内部类有方法create:
 public static <K,V> CombinerRunner<K,V> create(JobConf job,
 TaskAttemptID taskId,
 Counters.Counter inputCounter,
 TaskReporter reporter,
 org.apache.hadoop.mapreduce.OutputCommitter committer
 ) throws ClassNotFoundException 
 {
 Class<? extends Reducer<K,V,K,V>> cls = 
 (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
 if (cls != null) {
 return new OldCombinerRunner(cls, job, inputCounter, reporter);
 }
 // make a task context so we can get the classes
 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
 new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
 reporter);
 Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
 (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
 taskContext.getCombinerClass();
 if (newcls != null) {
 return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, 
 inputCounter, reporter, committer);
 }
 return null;
 }
 其中这一段应该是旧的API
 Class<? extends Reducer<K,V,K,V>> cls = 
 (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
 if (cls != null) {
 return new OldCombinerRunner(cls, job, inputCounter, reporter);
 }
 而这个是新的API
 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
 new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,
 reporter);
 Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
 (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
 taskContext.getCombinerClass();
 if (newcls != null) {
 return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, 
 inputCounter, reporter, committer);
 }
 return null;
 (不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?)
 而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法
 而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法
 (TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法)
 所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC
 而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数
 NewCombinerRunner(Class reducerClass,
 JobConf job,
 org.apache.hadoop.mapreduce.TaskAttemptID taskId,
 org.apache.hadoop.mapreduce.TaskAttemptContext context,
 Counters.Counter inputCounter,
 TaskReporter reporter,
 org.apache.hadoop.mapreduce.OutputCommitter committer) 
 {
 super(inputCounter, job, reporter);
 this.reducerClass = reducerClass;
 this.taskId = taskId;
 keyClass = (Class<K>) context.getMapOutputKeyClass();
 valueClass = (Class<V>) context.getMapOutputValueClass();
 comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();
 this.committer = committer;
 }
 Task
 MapTask
 $MapOutputBuffer
 private CombinerRunner<K,V> combinerRunner;
 $SpillThread类($表示内部类)
 combinerRunner = CombinerRunner.create(job, getTaskID(), 
 combineInputCounter,
 reporter, null);
 //此时,我们得到了设置好的合并类                             
 if (combinerRunner == null) {
 // spill directly
 DataInputBuffer key = new DataInputBuffer();
 while (spindex < mend &&
 kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
 final int kvoff = offsetFor(spindex % maxRec);
 int keystart = kvmeta.get(kvoff + KEYSTART);
 int valstart = kvmeta.get(kvoff + VALSTART);
 key.reset(kvbuffer, keystart, valstart - keystart);
 getVBytesForOffset(kvoff, value);
 writer.append(key, value);
 ++spindex;
 }
 } else {
 int spstart = spindex;
 while (spindex < mend &&
 kvmeta.get(offsetFor(spindex % maxRec)
 + PARTITION) == i) {
 ++spindex;
 }
 // Note: we would like to avoid the combiner if we've fewer
 // than some threshold of records for a partition
 if (spstart != spindex) {
 combineCollector.setWriter(writer);
 RawKeyValueIterator kvIter =
 new MRResultIterator(spstart, spindex);
 combinerRunner.combine(kvIter, combineCollector);
 }
 }
  再查看combine函数
 在Task的内部类NewCombinerRunner下
 public void combine(RawKeyValueIterator iterator, 
 OutputCollector<K,V> collector) 
 throws IOException, InterruptedException,ClassNotFoundException 
 {
 // make a reducer
 org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
 (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
 ReflectionUtils.newInstance(reducerClass, job);
 org.apache.hadoop.mapreduce.Reducer.Context 
 reducerContext = createReduceContext(reducer, job, taskId,
 iterator, null, inputCounter, 
 new OutputConverter(collector),
 committer,
 reporter, comparator, keyClass,
 valueClass);
 reducer.run(reducerContext);
 } 
 上面的reducerClass就是我们传入的xxxC
 最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象,
 然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run)
 在类Reducer中,run方法如下
 /**
 * Advanced application writers can use the 
 * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
            * control how the reduce task works.
 */
 public void run(Context context) throws IOException, InterruptedException {
 setup(context);
 try {
 while (context.nextKey()) {
 reduce(context.getCurrentKey(), context.getValues(), context);
 // If a back up store is used, reset it
 Iterator<VALUEIN> iter = context.getValues().iterator();
 if(iter instanceof ReduceContext.ValueIterator) {
 ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
 }
 }
 } finally {
 cleanup(context);
 }
 }
 有由于多态,此时调用的reduce是子类xxxC中的reduce方法
 (多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法)
 所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法
 且其输入形式:(以wordcount为例)
        reduce(Text key, Iterable<IntWritable> values, Context context)
        其中key是单词个数,而values是个数列表,也就是value1、value2........
        注意,此时已经是列表,即<键,list<值1、值2、值3.....>>
        (之所以得到这个结论,是因为我当时使用的combine类是WCReduce,
         即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。
         然后我在reduce中遍历了用syso输出
         结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,
         )
 嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了
 - /reduce
- publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
- private final IntWritableValueOut=newIntWritable();
- @Override
- protectedvoid reduce(Text key,Iterable<IntWritable> values,
- Context context) throws IOException,InterruptedException{
- for(IntWritable value : values){
- System.out.println(value.get()+"--");
- }
- // int total = 0 ;
- // for (IntWritable value : values) {
- // total += value.get();
- // }
- // ValueOut.set(total);
- // context.write(key, ValueOut);
- }
- }
- job.setCombinerClass(WCReduce.class);
来自为知笔记(Wiz)
 附件列表