加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark Streaming groupByKey和updateStateByKey实现

发布时间:2020-12-16 19:15:53 所属栏目:安全 来源:网络整理
导读:我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是“会话化”类似于 this blog post的网络流量 唯一的区别是我希望“会话化”IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取
我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是“会话化”类似于 this blog post的网络流量

唯一的区别是我希望“会话化”IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取此内容,但现在我想在流式上下文中执行此操作.

日志文件从Kafka读取并解析为K / V对(String,(String,Long,Long))或

(IP,(requestPage,time,time)).

然后我在这个K / V对上调用groupByKey().在批处理模式下,这将产生:

(String,CollectionBuffer((String,Long),…)或

(IP,CollectionBuffer((requestPage,time),…)

在StreamingContext中,它产生一个:

(String,ArrayBuffer((String,…)像这样:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

但是,随着下一个微分类(DStream)的到来,该信息被丢弃.

最终我想要的是,随着给定的IP继续交互并对其数据运行一些计算以“会话化”页面时间,随着时间的推移填充ArrayBuffer.

我相信发生这种情况的操作符是“updateStateByKey”.我在运算符方面遇到了一些麻烦(我是Spark& Scala的新手);

任何帮助表示赞赏.

迄今:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String,ArrayBuffer[(String,Long)])],b: Option[(String,Long)])]
                          ): Option[(String,Long)])] = {

  }

解决方法

我想你正在寻找这样的东西:

def updateGroupByKey(
                          newValues: Seq[(String,currentValue: Option[(String,Long)])] = {
     //Collect the values
     val buffs: Seq[ArrayBuffer[(String,Long)]] = (for (v <- newValues) yield v._2)
     val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
     //Convert state to buffer
     if (buffs2.isEmpty) None else {
        val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
        Some((key,buffs2.foldLeft(new ArrayBuffer[(String,Long)])((v,a) => v++a)))
     }
  }

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读