【Spark-core学习之九】 Spark案例
环境
虚拟机:VMware 10
Linux版本:CentOS-6.5-x86_64
客户端:Xshell4
FTP:Xftp4
jdk1.8
scala-2.10.4(依赖jdk1.8)
spark-1.6
一、PV & UV
PV是网站分析的一个术语,用以衡量网站用户访问的网页的数量。对于广告主,PV值可预期它可以带来多少广告收入。一般来说,PV与来访者的数量成正比,但是PV并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的PV。
1、什么是PV值
PV(page view)即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV值就是所有访问者在24小时(0点到24点)内看了某个网站多少个页面或某个网页多少次。PV是指页面刷新的次数,每一次页面刷新,就算做一次PV流量。
度量方法就是从浏览器发出一个对网络服务器的请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个PV。那么在这里只要是这个请求发送给了浏览器,无论这个页面是否完全打开(下载完成),那么都是应当计为1个PV。
package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Pv {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("PV"); JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");//根据PV定义 某个页面/网址的访问数量 将每一条记录根据网址解析出一条访问量JavaPairRDD<String, Integer> ipwebrdd = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {//7.213.213.208 吉林 2018-03-29 1522294977303 1920936170939152672 www.dangdang.com LoginString[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[5],1);}});//累加页面访问量JavaPairRDD<String, Integer> mapToPair = ipwebrdd.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//换个 用于按照整数key排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});List<Tuple2<String, Integer>> list = mapToPair.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}
结果:
(www.baidu.com,18791) (www.dangdang.com,18751) (www.suning.com,18699) (www.mi.com,18678) (www.taobao.com,18613)
2、什么是UV值
UV(unique visitor)即独立访客数,指访问某个站点或点击某个网页的不同IP地址的人数。在同一天内,UV只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。UV提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。
package com.wjy.test;import java.util.List;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class Uv {/*** 根据IP网址来确定唯一用户访问 然后排重 累计* @param args*/public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("UV");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/pvuvdata");JavaPairRDD<String, Integer> rdd2 = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] split = line.split("\t");return new Tuple2<String, Integer>(split[0]+"_"+split[5],1);}}).distinct().mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple)throws Exception {return new Tuple2<String, Integer>(tuple._1.split("_")[1],1);}});//累加JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {private static final long serialVersionUID = 1L;//反转 数值做KEY 用于排序 @Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)throws Exception {return tuple.swap();}}).sortByKey(false)//降序排序.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {private static final long serialVersionUID = 1L;//排序之后 反转回来 @Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)throws Exception {return tuple.swap();}});//取前5个元素List<Tuple2<String, Integer>> list = rdd3.take(5);for(Tuple2<String, Integer> t:list){System.out.println(t);}sc.stop();}}
结果:
(www.baidu.com,15830) (www.suning.com,15764) (www.mi.com,15740) (www.jd.com,15682) (www.dangdang.com,15641)
二、二次排序
对于两列以上的数据,要求对第一列排序之后,之后的列也要依次排序,思路就是:先对第一列进行排序,对于第一列数值相同,再对第二列进行排序。
举例:
待排序数据:secondSort.txt
3 1 5 2 6 5 8 123 1 4 4 123 5 432 3 54 5 121 8 654 3 98 package com.wjy.test;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class SecondSort{public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("SecondSort");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/secondSort.txt");//转成K-V格式//PairFunction 入参1-rdd的一行记录 入参2 入参3是call的出参JavaPairRDD<SecondSortKey, String> mapToPair = rdd.mapToPair(new PairFunction<String, SecondSortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondSortKey, String> call(String line)throws Exception {String[] sps = line.split(" ");int first = Integer.valueOf(sps[0]);int second = Integer.valueOf(sps[1]);SecondSortKey ss = new SecondSortKey(first,second);return new Tuple2<SecondSortKey, String>(ss,line);}});//sortByKey 会使用key也就是SecondSortKey的compareTo方法mapToPair.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<SecondSortKey, String> tuple) throws Exception {System.out.println(tuple._2);}});sc.stop();
}}
对于KEY自定义类型 实现comparable接口 实现comparTo方法
package com.wjy.test;import java.io.Serializable;public class SecondSortKey implements Serializable ,Comparable<SecondSortKey>{private static final long serialVersionUID = 1L;private int first;private int second;public SecondSortKey(int first,int second){super();this.first=first;this.second=second;}public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int compareTo(SecondSortKey o) {//先比较第一个数值 如果相同再比较第二个值 否则直接返回第一个值的比较结果if (getFirst()-o.getFirst() == 0){return getSecond() - o.getSecond();}else{return getFirst()-o.getFirst();}}}排序结果:
8 654 8 123 6 5 5 432 5 121 5 2 4 123 3 98 3 54 3 1 1 4
三、分组取topN
对于多组数据,去每一组数据前N个数据,比如列出每个班级的前三名等等问题。
解决的思路:先分组,然后每一组排序,取前N个。
案例:有三个班级的分数清单scores.txt,取出每班前三名。
groupByKey+排序算法:
package com.wjy.test;import java.util.Iterator;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;public class TopNtest {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster("local").setAppName("TopOs");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd = sc.textFile("./data/scores.txt");//转成K-V格式 方便下一步分组和排序//PairFunction 入参1rdd的一行数据 入参2、3是call的出参元素JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] ss = line.split("\t");return new Tuple2<String, Integer>(ss[0],Integer.valueOf(ss[1]));}});//使用groupByKey 将相同班级的数据放在一个集合里mapToPair.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {String classname = tuple._1;Iterator<Integer> it = tuple._2.iterator();Integer[] top3 = new Integer[3];while(it.hasNext()){Integer score = it.next();//排序for (int i = 0; i < top3.length; i++) {if(top3[i] == null){top3[i] = score;break;}else if(score > top3[i]){for (int j = 2; j > i; j--) {top3[j] = top3[j-1];}top3[i] = score;break;}}}System.out.println("classname="+classname);for (Integer i:top3){System.out.println(i);}}}); sc.stop(); } }
topN 结果:
classname=class3 98 70 70 classname=class1 102 100 99 classname=class2 88 85 85
转载于:https://www.cnblogs.com/cac2020/p/10684754.html
总结
以上是生活随笔为你收集整理的【Spark-core学习之九】 Spark案例的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: 如何用Kaplan-MeierPlott
- 下一篇: unittest中的测试固件