加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark Streaming mapWithState似乎定期重建完整状态

发布时间:2020-12-16 09:10:59 所属栏目:安全 来源:网络整理
导读:我正在使用 Scala(2.11)/ Spark(1.6.1)流媒体项目,并使用mapWithState()来跟踪以前批次中看到的数据. 状态分布在多个节点上的20个分区中,由StateSpec.function(trackStateFunc _)numPartitions(20)创建.在这种状态下,我们只有几个键(?100)映射到集合,具有高
我正在使用 Scala(2.11)/ Spark(1.6.1)流媒体项目,并使用mapWithState()来跟踪以前批次中看到的数据.

状态分布在多个节点上的20个分区中,由StateSpec.function(trackStateFunc _)numPartitions(20)创建.在这种状态下,我们只有几个键(?100)映射到集合,具有高达160.000个条目,在整个应用程序中增长.整个状态最多可达3GB,可以由群集中的每个节点处理.在每个批处理中,一些数据被添加到一个状态,但不被删除直到过程的最后,即?15分钟.

在遵循应用程序UI时,与其他批次相比,每第10批处理时间非常高.查看图片:

The spikes show the higher processing time.

黄色字段表示处理时间很长.

enter image description here

更详细的作业视图显示,在这些批处理中,会发生在某一时刻,恰恰是当所有20个分区被“跳过”时.或者这是UI说的.

enter image description here

我对跳过的理解是,每个状态分区是一个未执行的可能任务,因为它不需要重新计算.但是,我不明白为什么每个Job中的跳过数量会有所不同,为什么最后一个Job需要这么多的处理.处理时间越长,与状态的大小无关,只会影响持续时间.

这是mapWithState()功能中的错误还是这个意图行为?底层数据结构是否需要进行某种重组,Set中的状态是否需要复制数据?还是更容易成为我的应用程序的缺陷?

解决方法

Is this a bug in the mapWithState() functionality or is this intended
behaviour?

这是行为.您看到的尖峰是因为您的数据在给定批次的结尾处得到检查点.如果您注意到更长批次的时间,您将看到它每100秒持续发生一次.这是因为检查点时间是不变的,并且根据您的batchDuration进行计算,除非您明确设置DStream.checkpoint间隔,否则与数据源的数据来源读取批次乘以某个常量的频率是多少.

以下是MapWithStateDStream中的相关代码段:

override def initialize(time: Time): Unit = {
  if (checkpointDuration == null) {
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
  }
  super.initialize(time)
}

DEFAULT_CHECKPOINT_DURATION_MULTIPLIER在哪里:

private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

哪些行与您所看到的行为完全一致,因为您的读取批次持续时间为每10秒=> 10 * 10 = 100秒.

这是正常的,那就是用Spark持续状态的代价.您方面的优化可能是考虑如何最大限度地减少内存中保留的状态的大小,以便使序列化尽可能快.另外,请确保数据分布在足够的执行程序中,以便在所有节点之间均匀分布状态.此外,我希望您已经启用了Kryo Serialization,而不是默认的Java序列化,这可以让您有意义的性能提升.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读