当前位置:
首页 >
使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
发布时间:2024/2/28
47
豆豆
生活随笔
收集整理的这篇文章主要介绍了
使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
接 使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
https://georgedage.blog.csdn.net/article/details/103508619
先对上篇做一个回顾,在上一篇我们编写消费者,并且使用sparkStreaming对kafka中的数据进行批处理。
这篇对我们接收来的数据进行一个指标的统计。
这是原始的数据
1576139418467 4 1 henan kaifeng 1576139419467 3 0 hebei handan 1576139420467 3 2 henan kaifeng 1576139421467 0 0 beijing daxing 1576139422467 3 0 beijing daxing 1576139423467 1 2 henan zhengzhou 1576139424467 4 2 henan kaifeng 1576139425477 0 0 henan kaifeng 1576139426483 0 0 beijing haidian 1576139427483 3 0 beijing haidian 1576139428483 4 2 henan kaifeng 1576139429483 1 2 henan kaifeng 1576139430483 1 0 guangzhou zhongshan 1576139431483 3 0 henan zhengzhou 1576139432483 4 0 guangzhou zhuhai 1576139433483 1 0 guangzhou zhongshan对时间戳进行转化,然后统计这一天中访问的同一ip多次点击同一广告id的前几位,可以经过一个阈值的过滤,将其列为黑名单。也就是我们所生产的日志,例如你进行搜索时百度第一页的某广告,点击是需要向百度进行付款的,所以我们对于恶意点击者进行拉黑处理。算是一个风控措施。
友情提示:如果按照之前我们对于生产者的随机数的话,不容易看到效果,所以讲生产者代码中随机数范围进行缩小。
消费者代码如下:
package com.kafkaimport java.text.SimpleDateFormat import java.util.Dateimport org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kks")val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(5))sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")ssc.checkpoint("D:\\ckpoint")val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())val sdf = new SimpleDateFormat("yyyy-MM-dd")val userDs: DStream[(String, Int)] = mapDs.transform(x => x.map(line => {val arr = line.split(" ")val date = sdf.format(new Date(arr(0).toLong))val userId = arr(1)val adId = arr(2)(date + "," + userId + "," + adId, 1)}))val reduceDs = userDs.reduceByKey(_+_)val resDs: DStream[(String, Int)] = reduceDs.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {val now = currentValues.sumval pre = preValue.getOrElse(0)Option(now + pre)})resDs.print()ssc.start()ssc.awaitTermination()} }部分结果展示:
(2019-12-12,2,2,1) (2019-12-12,4,0,3) (2019-12-12,1,0,4) (2019-12-12,3,1,1) (2019-12-12,4,1,3) (2019-12-12,3,2,2) (2019-12-12,2,0,2) (2019-12-12,4,2,2) (2019-12-12,2,1,1) (2019-12-12,0,0,5)后续就是我们假如将一天内单个ip同一adid的访问数值大于100的进行过滤处理,然后存储即可。下回分解!!!
总结
以上是生活随笔为你收集整理的使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: SparkStreaming Exce
- 下一篇: 大剑无锋之ArrayList中使用增强f