加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

【原创】案例分享(3)用户行为分析--见证scala的强大

发布时间:2020-12-16 08:58:07 所属栏目:安全 来源:网络整理
导读:场景分析 用户行为分析应用的场景很多,像线上网站访问统计,线下客流分析(比如图像人脸识别、wifi探针等),比较核心的指标有几个: PV | UV | SD | SC 指标说明: PV(Page View):网站浏览量或者商场门店的访问量 UV(Unique Visitor):独立访客数,即

场景分析

用户行为分析应用的场景很多,像线上网站访问统计,线下客流分析(比如图像人脸识别、wifi探针等),比较核心的指标有几个:

PV | UV | SD | SC

指标说明:

PV(Page View):网站浏览量或者商场门店的访问量
UV(Unique Visitor):独立访客数,即去重后的人数
SD(Session Duration):单次会话停留时间
SC(Session Count):会话次数

用户行为分析的原始数据通常是一系列时间离散数据,比如网站访问记录:用户在一个时间点访问了一个网页,然后又在下个时间点访问了下个网页;

?

这些原始数据可以抽象为:

User?|?Timestamp?|?Target

即用户在什么时间点访问了什么目标;

?

统计PV、UV比较简单,但是在时间离散数据的基础上,要计算SD、SC这两个指标,常用的方式是设置过期时间阈值,如果用户两次访问的时间间隔超过阈值,则认为是两次Session;然后在一次Session的所有数据中取时间最早和最晚的数据来统计本次Session Duration;

统计示例

输入数据

(user1,2018-12-01 01:00:00,t1)
(user1,2018-12-01 01:01:30,2018-12-01 01:06:00,2018-12-01 01:20:00,2018-12-01 01:24:00,t1)

可以统计出

PV=5,UV=1

过期时间阈值设置为5分钟,以上数据应该统计出来2次Session,分别是:

Session1: (2018-12-01 01:00:00 到 2018-12-01 01:06:00),Duration:6分钟
Session2: (2018-12-01 01:20:00 到 2018-12-01 01:24:00),Duration:4分钟

实际处理时还要数据乱序的问题,尤其是在实时计算中,你想好怎样做了吗?

?

Scala代码实现

下面给出scala实现,来见证scala的强大:

scala核心代码(一步foldLeft)

scala

  val expireInSecond = 300
  def mergeTimeArray(arr1 : ArrayBuffer[(Long,Long)],arr2 : ArrayBuffer[(Long,Long)]) : ArrayBuffer[(Long,Long)] = {
    if (arr1.head._1.equals(0l)) arr2
    else if (arr2.head._1.equals(0l)) arr1
    else (arr1 ++ arr2).sortBy(_._1).foldLeft(ArrayBuffer[(Long,Long)]())((result,item) => if (!result.isEmpty && result.last._2 + expireInSecond >= item._1) {result.update(result.length - 1,(result.last._1,math.max(result.last._2,item._2))); result} else result += item)
  }

spark核心代码(2步map 1步aggregateByKey)

scala

  /**
    * @param data (user,timestamp,target)
    * @return (user,target,session_count,session_duration)
    */
  def process(data : RDD[(String,Long,String)]) : RDD[(String,String,Integer,Double)] = {
    //((user,target),timestamp)
    data.map(item => ((item._1,item._3),item._2))
      //((user,Array[(startTime,endTime)])
      .aggregateByKey(ArrayBuffer((0l,0l)))((result : ArrayBuffer[(Long,timestamp: Long) => mergeTimeArray(result,ArrayBuffer((timestamp,timestamp))),(result1 : ArrayBuffer[(Long,result2 : ArrayBuffer[(Long,Long)]) => mergeTimeArray(result1,result2))
      //(user,session_duration)
      .map(item => (item._1._1,item._1._2,item._2.length,item._2.foldLeft(0l)((result,item) => result + (item._2 - item._1)).toDouble / item._2.length))
  }

测试运行

  def main(args : Array[String]) : Unit = {
    val conf = new SparkConf().setAppName("UserAnalysis").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val arr = Array(("user1",1546054000l,"t1"),("user1",1546054090l,1546054360l,1546055200l,1546055440l,"t1"))
    //(user,target)
    val data : RDD[(String,String)] = sc.parallelize(arr)
    this.process(data).foreach(println)
  }

输出

(user1,t1,2,300.0)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读