关于MapReduce中自定义Combine类(一)
生活随笔
收集整理的这篇文章主要介绍了
关于MapReduce中自定义Combine类(一)
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
MRJobConfig public static fina COMBINE_CLASS_ATTR 属性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class" ————子接口(F4) JobContent 方法getCombinerClass ————子实现类 JobContextImpl 实现getCombinerClass方法: public Class<? extends Reducer<?,?,?,?>> getCombinerClass() throws ClassNotFoundException { return (Class<? extends Reducer<?,?,?,?>>) conf.getClass(COMBINE_CLASS_ATTR, null); } 因为JobContextImpl是MRJobConfig子类 所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性 ————子类Job public void setCombinerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); } 因为JobContextImpl是MRJobConfig子类, 而Job是JobContextImpl的子类 所以也有COMBINE_CLASS_ATTR属性 通过setCombinerClass设置了父类MRJobConfig的属性 MRJobConfig ————子接口JobContent 方法getCombinerClass ————子实现类 JobContextImpl ————子类 Job ————子实现类 TaskAttemptContext 继承了方法getCombinerClass Task $CombinerRunner(Task的内部类) 该内部类有方法create: public static <K,V> CombinerRunner<K,V> create(JobConf job, TaskAttemptID taskId, Counters.Counter inputCounter, TaskReporter reporter, org.apache.hadoop.mapreduce.OutputCommitter committer ) throws ClassNotFoundException { Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); if (cls != null) { return new OldCombinerRunner(cls, job, inputCounter, reporter); } // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId, reporter); Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) taskContext.getCombinerClass(); if (newcls != null) { return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer); } return null; } 其中这一段应该是旧的API Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); if (cls != null) { return new OldCombinerRunner(cls, job, inputCounter, reporter); } 而这个是新的API org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId, reporter); Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) taskContext.getCombinerClass(); if (newcls != null) { return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer); } return null; (不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?) 而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法 而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法 (TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法) 所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC 而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数 NewCombinerRunner(Class reducerClass, JobConf job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, org.apache.hadoop.mapreduce.TaskAttemptContext context, Counters.Counter inputCounter, TaskReporter reporter, org.apache.hadoop.mapreduce.OutputCommitter committer) { super(inputCounter, job, reporter); this.reducerClass = reducerClass; this.taskId = taskId; keyClass = (Class<K>) context.getMapOutputKeyClass(); valueClass = (Class<V>) context.getMapOutputValueClass(); comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator(); this.committer = committer; } Task MapTask $MapOutputBuffer private CombinerRunner<K,V> combinerRunner; $SpillThread类($表示内部类) combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); //此时,我们得到了设置好的合并类 if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } 再查看combine函数 在Task的内部类NewCombinerRunner下 public void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector) throws IOException, InterruptedException,ClassNotFoundException { // make a reducer org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer = (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>) ReflectionUtils.newInstance(reducerClass, job); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, taskId, iterator, null, inputCounter, new OutputConverter(collector), committer, reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); } 上面的reducerClass就是我们传入的xxxC 最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象, 然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run) 在类Reducer中,run方法如下 /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); } } 有由于多态,此时调用的reduce是子类xxxC中的reduce方法 (多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法) 所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法 且其输入形式:(以wordcount为例) reduce(Text key, Iterable<IntWritable> values, Context context) 其中key是单词个数,而values是个数列表,也就是value1、value2........ 注意,此时已经是列表,即<键,list<值1、值2、值3.....>> (之所以得到这个结论,是因为我当时使用的combine类是WCReduce, 即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。 然后我在reduce中遍历了用syso输出 结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊, ) 嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了 /reduce publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ private final IntWritableValueOut=newIntWritable(); @Override protectedvoid reduce(Text key,Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{ for(IntWritable value : values){ System.out.println(value.get()+"--"); } // int total = 0 ; // for (IntWritable value : values) { // total += value.get(); // } // ValueOut.set(total); // context.write(key, ValueOut); } } job.setCombinerClass(WCReduce.class);
附件列表
转载于:https://www.cnblogs.com/xuanlvshu/p/5744445.html
总结
以上是生活随笔为你收集整理的关于MapReduce中自定义Combine类(一)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 一个简单的小程序demo
- 下一篇: 【1800题】一、函数、极限、连续