离线轻量级大数据平台Spark之JavaRDD关联join操作
生活随笔
收集整理的这篇文章主要介绍了
离线轻量级大数据平台Spark之JavaRDD关联join操作
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
对两个RDD进行关联操作,如:
1)文件post_data.txt包含:post_id\title\content
2)文件train.txt包含:dev_id\post_id\praise\time
通过post_id关联,提取post_id\content\praise字段,文件字段用不定长多个空格分割;
参考代码如下:
package scs.contest;import java.util.List; import java.util.Properties;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class pageview {public static String path="/home/spark/data";public static void main(String[] args) {//第一步:设置环境字符集,避免中文乱码Properties pps=System.getProperties(); pps.setProperty("file.encoding","UTF-8"); //第二步:txt数据导入并分割成到训练集和预测集SparkConf sparkConf = new SparkConf().setAppName("pageview");JavaSparkContext jsc = new JavaSparkContext(sparkConf);JavaRDD<String> postdataLines =jsc.textFile(path+"/post_data.txt");//id\title\contentJavaRDD<String> trainLines =jsc.textFile(path+"/train.txt");//dev_id\post_id\praise\timeJavaPairRDD<String,String> pdLines=postdataLines.mapToPair(new PairFunction<String,String,String>(){public Tuple2<String, String> call(String t) throws Exception {String[] st=t.split("\\s+");//多个空格情况的分割String content="";for(int i=1;i<st.length;i++){content+=st[i];}return new Tuple2(st[0],content);//title+content都取,title一般为空的情况下}});JavaPairRDD<String,String> ptLines=trainLines.mapToPair(new PairFunction<String,String,String>(){public Tuple2<String, String> call(String t) throws Exception {String[] st=t.split("\\s+");return new Tuple2(st[1], st[2]);}});//合并两个JavaPairRDD,得到训练集和预测集JavaPairRDD<String, Tuple2<String, String>> tLines=pdLines.join(ptLines);//训练街Tuple2<String, Tuple2<String, String>> tpfirst=tLines.first();System.out.println(tpfirst._1+"|"+tpfirst._2()._1+"|"+tpfirst._2()._2);//预测集,Optional<String>=Optional.empty的记录数JavaPairRDD<String, Tuple2<String, Optional<String>>> pLines=pdLines.leftOuterJoin(ptLines); Tuple2<String, Tuple2<String, String>> tpfirst1=tLines.first();System.out.println(tpfirst1._1+"|"+tpfirst1._2()._1+"|"+tpfirst1._2()._2); } }执行结果: 507298|无聊的一天又开始了…|1 547732|人生不可能总一帆风顺,总会有一些挫折的。|Optional.empty
参考官网的JavaPariRDD的join操作。
总结
以上是生活随笔为你收集整理的离线轻量级大数据平台Spark之JavaRDD关联join操作的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 关于微信小程序的好奇和多事
- 下一篇: Java-POI操作excel清除单元格