欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

关于MapReduce中自定义Combine类(一)

发布时间:2023/12/20 编程问答 46 豆豆
生活随笔 收集整理的这篇文章主要介绍了 关于MapReduce中自定义Combine类(一) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.
MRJobConfig public static fina COMBINE_CLASS_ATTR 属性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class" ————子接口(F4) JobContent 方法getCombinerClass ————子实现类 JobContextImpl 实现getCombinerClass方法: public Class<? extends Reducer<?,?,?,?>> getCombinerClass() throws ClassNotFoundException { return (Class<? extends Reducer<?,?,?,?>>) conf.getClass(COMBINE_CLASS_ATTR, null); } 因为JobContextImpl是MRJobConfig子类 所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性 ————子类Job public void setCombinerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); } 因为JobContextImpl是MRJobConfig子类, 而Job是JobContextImpl的子类 所以也有COMBINE_CLASS_ATTR属性 通过setCombinerClass设置了父类MRJobConfig的属性 MRJobConfig ————子接口JobContent 方法getCombinerClass ————子实现类 JobContextImpl ————子类 Job ————子实现类 TaskAttemptContext 继承了方法getCombinerClass Task    $CombinerRunner(Task的内部类)    该内部类有方法create: public static <K,V> CombinerRunner<K,V> create(JobConf job, TaskAttemptID taskId, Counters.Counter inputCounter, TaskReporter reporter, org.apache.hadoop.mapreduce.OutputCommitter committer ) throws ClassNotFoundException { Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); if (cls != null) { return new OldCombinerRunner(cls, job, inputCounter, reporter); } // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId, reporter); Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) taskContext.getCombinerClass(); if (newcls != null) { return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer); } return null; } 其中这一段应该是旧的API Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); if (cls != null) { return new OldCombinerRunner(cls, job, inputCounter, reporter); } 而这个是新的API org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId, reporter); Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) taskContext.getCombinerClass(); if (newcls != null) { return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer); } return null; (不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?) 而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法 而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法 (TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法) 所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC 而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数 NewCombinerRunner(Class reducerClass, JobConf job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, org.apache.hadoop.mapreduce.TaskAttemptContext context, Counters.Counter inputCounter, TaskReporter reporter, org.apache.hadoop.mapreduce.OutputCommitter committer) { super(inputCounter, job, reporter); this.reducerClass = reducerClass; this.taskId = taskId; keyClass = (Class<K>) context.getMapOutputKeyClass(); valueClass = (Class<V>) context.getMapOutputValueClass(); comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator(); this.committer = committer; } Task MapTask $MapOutputBuffer private CombinerRunner<K,V> combinerRunner; $SpillThread类($表示内部类) combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); //此时,我们得到了设置好的合并类                             if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } 再查看combine函数 在Task的内部类NewCombinerRunner下 public void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector) throws IOException, InterruptedException,ClassNotFoundException { // make a reducer org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer = (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>) ReflectionUtils.newInstance(reducerClass, job); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, taskId, iterator, null, inputCounter, new OutputConverter(collector), committer, reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); } 上面的reducerClass就是我们传入的xxxC 最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象, 然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run) 在类Reducer中,run方法如下 /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to            * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        } } } finally { cleanup(context); } } 有由于多态,此时调用的reduce是子类xxxC中的reduce方法 (多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法) 所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法 且其输入形式:(以wordcount为例)        reduce(Text key, Iterable<IntWritable> values, Context context)        其中key是单词个数,而values是个数列表,也就是value1、value2........        注意,此时已经是列表,即<键,list<值1、值2、值3.....>>        (之所以得到这个结论,是因为我当时使用的combine类是WCReduce,         即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。         然后我在reduce中遍历了用syso输出         结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,         ) 嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了
  • /reduce
  •     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  •         private final IntWritableValueOut=newIntWritable();
  •         @Override
  •         protectedvoid reduce(Text key,Iterable<IntWritable> values,
  •                 Context context)  throws IOException,InterruptedException{
  •             for(IntWritable value : values){
  •                 System.out.println(value.get()+"--");
  •             }
  •  
  • //            int total = 0 ;
  • //            for (IntWritable value : values) {
  • //                total += value.get();
  • //            }
  • //            ValueOut.set(total);
  • //            context.write(key, ValueOut);
  •         }
  •  
  •     }
  •           
  • job.setCombinerClass(WCReduce.class);


  • 来自为知笔记(Wiz)



    附件列表

     

    转载于:https://www.cnblogs.com/xuanlvshu/p/5744445.html

    总结

    以上是生活随笔为你收集整理的关于MapReduce中自定义Combine类(一)的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。