欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)

发布时间:2024/2/28 编程问答 45 豆豆
生活随笔 收集整理的这篇文章主要介绍了 spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)


  • 项目基本信息,架构,需要一览
  • 各个范围Session步长、访问时长占比统计概述
  • 各个范围Session步长、访问时长占比统计简要运行流程
  • 代码实现
  • 小结

  • 1. 项目基本信息,架构,需要一览


    见项目第一篇文章 spark项目实战:电商分析平台之项目概述

    代码在github上
    初始代码: https://github.com/githubIMrLi/spark-commerce_basic
    完整代码:https://github.com/githubIMrLi/spark-commerce


    2. 各个范围Session步长、访问时长占比统计概述


  • 访问时长:session的最早时间与最晚时间之差。
  • 访问步长:session中的action个数。
  • 统计出符合筛选条件的session中,访问时长在1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m,访问步长在1_3、4_6、…以上各个范围内的各种session的占比

  • 3. 各个范围Session步长、访问时长占比统计简要运行流程



    4. 代码实现


  • 先运行 commerce_basic\mock\src\main\scala\MockDataGenerate.scala 代码,生成所需表

  • 各个范围Session步长、访问时长占比统计代码
  • import java.util.{Date, UUID}import commons.conf.ConfigurationManager import commons.constant.Constants import commons.model.{UserInfo, UserVisitAction} import commons.utils._ import net.sf.json.JSONObject import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, SparkSession}import scala.collection.mutableobject SessionStatisticAgg {def main(args: Array[String]): Unit = {// 获取查询的限制条件val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)val taskParam = JSONObject.fromObject(jsonStr)// 获取全局独一无二的主键val taskUUID = UUID.randomUUID().toString// 创建sparkConfval sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")// 创建sparkSessionval sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// actionRDD : rdd[UserVisitAction]val actionRDD = getActionRDD(sparkSession, taskParam)// session2ActionRDD : rdd[(sid, UserVisitAction)]val sessionId2ActionRDD = actionRDD.map {item => (item.session_id, item)}// sessionId2GroupRDD : rdd[(sid, iterable(UserVisitAction))]val sessionId2GroupRDD = sessionId2ActionRDD.groupByKey()// sparkSession.sparkContext.setCheckpointDir()sessionId2GroupRDD.cache()// sessionId2GroupRDD.checkpoint()sessionId2GroupRDD.foreach(println(_))// 获取聚合数据里面的聚合信息val sessionId2FullInfoRDD = getFullInfoData(sparkSession, sessionId2GroupRDD)//创建自定义累加器val sessionStatAccumulator = new SessionStatAccumulator// 注册自定义累加器sparkSession.sparkContext.register(sessionStatAccumulator, "sessionAccumulator")//过滤用户数据val sessionId2FilterRDD = getFilteredData(taskParam, sessionStatAccumulator, sessionId2FullInfoRDD)sessionId2FilterRDD.foreach(println(_))for ((k,v) <- sessionStatAccumulator.value){println("k="+k+", value="+v)}//获取最终的统计结果getFinalData(sparkSession,taskUUID,sessionStatAccumulator.value)}def getFinalData(sparkSession: SparkSession,taskUUID: String,value: mutable.HashMap[String, Int]) = {//获取所有符合过滤条件的session个数val session_count = value.getOrElse(Constants.SESSION_COUNT,1).toDouble//不同范围访问时长的session个数val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s,0)val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)// 不同访问步长的session个数val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)val stat = SessionAggrStat(taskUUID, session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)val statRDD = sparkSession.sparkContext.makeRDD(Array(stat))import sparkSession.implicits._statRDD.toDF().write.format("jdbc").option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)).option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)).option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)).option("dbtable","session_ration_0308").mode(SaveMode.Append).save()}def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionStatAccumulator) = {if (visitLength >= 1 && visitLength <= 3) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s)} else if (visitLength >= 4 && visitLength <= 6) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s)} else if (visitLength >= 7 && visitLength <= 9) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s)} else if (visitLength >= 10 && visitLength <= 30) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s)} else if (visitLength > 30 && visitLength <= 60) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s)} else if (visitLength > 60 && visitLength <= 180) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m)} else if (visitLength > 180 && visitLength <= 600) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m)} else if (visitLength > 600 && visitLength <= 1800) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m)} else if (visitLength > 1800) {sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m)}}def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionStatAccumulator): Unit = {if (stepLength >= 1 && stepLength <= 3) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3)} else if (stepLength >= 4 && stepLength <= 6) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6)} else if (stepLength >= 7 && stepLength <= 9) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9)} else if (stepLength >= 10 && stepLength <= 30) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30)} else if (stepLength > 30 && stepLength <= 60) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60)} else if (stepLength > 60) {sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60)}}def getFilteredData(taskParam: JSONObject,sessionStatAccumulator: SessionStatAccumulator,sessionId2FullInfoRDD: RDD[(String, String)]) = {val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)var filterInfo = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +(if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +(if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +(if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +(if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +(if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +(if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")if (filterInfo.endsWith("\\|"))filterInfo = filterInfo.substring(0, filterInfo.length - 1)val sessionId2FilterRDD = sessionId2FullInfoRDD.filter {case (sessionId, fullInfo) => {var success = trueif (!ValidUtils.between(fullInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES))success = falseif (!ValidUtils.equal(fullInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS))success = falseif (!ValidUtils.in(fullInfo, Constants.FIELD_CATEGORY_ID, filterInfo, Constants.PARAM_CATEGORY_IDS))success = falseif (success) {sessionStatAccumulator.add(Constants.SESSION_COUNT)val visitLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLongval stepLength = StringUtils.getFieldFromConcatString(fullInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLongcalculateVisitLength(visitLength, sessionStatAccumulator)calculateStepLength(stepLength, sessionStatAccumulator)}success}}sessionId2FilterRDD}def getFullInfoData(sparkSession: SparkSession, sessionId2GroupRDD: RDD[(String, Iterable[UserVisitAction])]) = {val userId2AggrInfoRDD = sessionId2GroupRDD.map {case (sid, iterableAction) => {var startTime: Date = nullvar endTime: Date = nullvar userId = -1Lval searchKeywords = new StringBuffer("")val clickCategories = new StringBuilder("")var stepLength = 0for (action <- iterableAction) {if (userId == -1) {userId = action.user_id}val actionTime = DateUtils.parseTime(action.action_time)if (startTime == null || startTime.after(actionTime))startTime = actionTimeif (endTime == null || endTime.before(actionTime))endTime = actionTimeval searchKeyword = action.search_keywordval clickCategory = action.click_category_idif (StringUtils.isNotEmpty(searchKeyword) &&!searchKeywords.toString.contains(searchKeyword))searchKeywords.append(searchKeyword + ",")if (clickCategory != -1 && !clickCategories.toString().contains(clickCategory))clickCategories.append(clickCategory + ",")stepLength += 1}val searchKw = StringUtils.trimComma(searchKeywords.toString)val clickCg = StringUtils.trimComma(clickCategories.toString())val visitLength = (endTime.getTime - startTime.getTime) / 1000val aggrInfo = Constants.FIELD_SESSION_ID + "=" + sid + "|" +Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" +Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" +Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)(userId, aggrInfo)}}val sql = "select * from user_info"import sparkSession.implicits._// sparkSession.sql(sql) : DateFrame DateSet[Row]// sparkSession.sql(sql).as[UserInfo] : DateSet[UserInfo]// sparkSession.sql(sql).as[UserInfo].rdd : RDD[UserInfo]// sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id,item)) : RDD[(user_id,userInfo)]val userInfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item))userId2AggrInfoRDD.join(userInfoRDD).map {case (userId, (aggrInfo, userInfo)) => {val age = userInfo.ageval professional = userInfo.professionalval sex = userInfo.sexval city = userInfo.cityval fullInfo = aggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" +Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +Constants.FIELD_SEX + "=" + sex + "|" +Constants.FIELD_CITY + "=" + cityval sessionId = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_SESSION_ID)(sessionId, fullInfo)}}}def getActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)val sql = "select * from user_visit_action where date>='" + startDate +"' and date<='" + endDate + "'"import sparkSession.implicits._// sparkSession.sql(sql) : DataFrame DateSet[Row]// sparkSession.sql(sql).as[UserVisitAction] : DateSet[UserVisitAction]sparkSession.sql(sql).as[UserVisitAction].rdd} }
  • 自定义累加器代码:
  • import org.apache.spark.util.AccumulatorV2import scala.collection.mutableclass SessionStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {val countMap = new mutable.HashMap[String, Int]()override def isZero: Boolean = countMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {val acc = new SessionStatAccumulatoracc.countMap ++= this.countMapacc}override def reset(): Unit = {countMap.clear()}override def add(v: String): Unit = {if (!countMap.contains(v)) {countMap += (v -> 0) //(v, 0)}countMap.update(v, countMap(v) + 1)}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {// (0 /: (1 to 100 ))(_+_)// (0 /: (1 to 100)){ case (item1, item2) => item1+item2 }// (1 to 100).foldLeft(0)other match {case acc: SessionStatAccumulator =>//this.countMap /: acc.countMap// 初始值:this.countMap// 迭代对象 : acc.countMap (k,v)acc.countMap.foldLeft(this.countMap) {case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0) + v))}}}override def value: mutable.HashMap[String, Int] = {this.countMap} }

    结果存储到mysql里面,结果如下

    5. 小结


    总结

    以上是生活随笔为你收集整理的spark项目实战:电商分析平台之各个范围Session步长、访问时长占比统计(需求一)的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。