欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程语言 > java >内容正文

java

MapReduce Java API-多输入路径方式

发布时间:2025/3/19 java 38 豆豆
生活随笔 收集整理的这篇文章主要介绍了 MapReduce Java API-多输入路径方式 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

场景

MapReduce Java API实例-统计单词出现频率:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119410169

在上面实现统计单次出现的频率的基础上。

数据集只是单路径,如果有多个数据集文件,即有多个txt文件,要怎么实现。

多文件输入采用MultipleInputs.addInputPath方法即可完成。

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
关注公众号
霸道的程序猿
获取编程相关电子书、教程推送与免费下载。

实现

map和reduce的代码基本和上面的一致

1、map类

package com.badao.multinput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException; import java.util.StringTokenizer;public class MultInputMapper extends Mapper<LongWritable,Text,Text,IntWritable> {//1、编写map函数,通过继承Mapper类实现里面的map函数//   Mapper类当中的第一个函数是Object,也可以写成Long//   第一个参数对应的值是行偏移量//2、第二个参数类型通常是Text类型,Text是Hadoop实现的String 类型的可写类型//   第二个参数对应的值是每行字符串//3、第三个参数表示的是输出key的数据类型//4、第四个参数表示的是输出value的数据类型,IntWriable 是Hadoop实现的int类型的可写数据类型public final static IntWritable one = new IntWritable(1);public Text word = new Text();//key 是行偏移量//value是每行字符串@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer stringTokenizer = new StringTokenizer(value.toString());while (stringTokenizer.hasMoreTokens()){//stringTokenizer.nextToken()是字符串类型,使用set函数完成字符串到Text数据类型的转换word.set(stringTokenizer.nextToken());//通过write函数写入到本地文件context.write(word,one);}} }

2、reduce类

package com.badao.multinput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//第一个参数类型是输入值key的数据类型,map中间输出key的数据类型 //第二个参数类型是输入值value的数据类型,map中间输出value的数据类型 //第三个参数类型是输出值key的数据类型,他的数据类型要跟job.setOutputKeyClass(Text.class) 保持一致 //第四个参数类型是输出值value的数据类型,它的数据类型要跟job.setOutputValueClass(IntWriable.class) 保持一致public class MultInputReducer extends Reducer<Text, IntWritable,Text,IntWritable> {public IntWritable result = new IntWritable();//key就是单词  values是单词出现频率列表@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable val:values){//get就是取出IntWriable的值sum += val.get();}result.set(sum);context.write(key,result);} }

3、job类

job这里不同,单路径时

FileInputFormat.addInputPath(job,new Path("D:\\words.txt"));

多路径时

        Path path1 = new Path("D:\\words.txt");Path path2 = new Path("D:\\words2.txt");MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class);MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class);

完整代码

package com.badao.multinput;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;import java.io.IOException;public class MultInputJob {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {wordCountLocal();}public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();//实例化一个作业,word count是作业的名字Job job = Job.getInstance(conf, "multinputwordcount");//指定通过哪个类找到对应的jar包job.setJarByClass(MultInputJob.class);//为job设置Mapper类job.setMapperClass(MultInputMapper.class);job.setCombinerClass(IntSumReducer.class);//为job设置reduce类job.setReducerClass(MultInputReducer.class);//为job的输出数据设置key类job.setOutputKeyClass(Text.class);//为job输出设置value类job.setOutputValueClass(IntWritable.class);//多个输入路径Path path1 = new Path("D:\\words.txt");Path path2 = new Path("D:\\words2.txt");MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class);MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class);//为job设置输出路径FileOutputFormat.setOutputPath(job,new Path("D:\\mulinputOut"));job.waitForCompletion(true);}}

运行job类查看效果

 

总结

以上是生活随笔为你收集整理的MapReduce Java API-多输入路径方式的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。