加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark mapWithState API说明

发布时间:2020-12-16 19:18:13 所属栏目:安全 来源:网络整理
导读:我一直在Spark Streaming中使用mapWithState API,但有两件事情并不清楚StateSpec.function: 假设我的功能是: def trackStateForKey(batchTime: Time,key: Long,newValue: Option[JobData],currentState: State[JobData]): Option[(Long,JobData)] 为什么新
我一直在Spark Streaming中使用mapWithState API,但有两件事情并不清楚StateSpec.function:

假设我的功能是:

def trackStateForKey(batchTime: Time,key: Long,newValue: Option[JobData],currentState: State[JobData]): Option[(Long,JobData)]

>为什么新值是Option [T]类型?据我所知,它总是为我定义,并且由于该方法应该被调用一个新的状态,我真的没有看到为什么它可以是可选的.
>返回值是什么意思?我试图在文档和源代码中找到一些指针,但没有一个描述它的用途.由于我使用state.remove()和state.update()修改键的状态,为什么我必须对返回值执行相同的操作?

在我当前的实现中,如果我删除密钥则返回None,如果我更新它则返回Some(newState),但我不确定这是否正确.

解决方法

Why is the new value an Option[T] type? As far as I’ve seen,it was
always defined for me,and since the method is supposed to be called
with a new state,I don’t really see the point why it could be
optional.

这是一个Option [T],因为如果使用StateSpec.timeout设置超时,例如:

StateSpec.function(spec _).timeout(Milliseconds(5000))

然后,一旦函数超时,传入的值将为None,State [T]上的isTimingOut方法将产生true.这是有道理的,因为状态的超时并不意味着已经为指定的键到达了一个新值,并且通常比使用T传递null更加安全(无论如何都不能用于原语),因为您期望用户安全地操作选项[T].

您可以在Sparks实现中看到:

// Get the timed out state records,call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key,state,_) =>
    wrappedState.wrapTimingOutState(state)
    val returned = mappingFunction(batchTime,key,None,wrappedState) // <-- This.
    mappedData ++= returned
    newStateMap.remove(key)
  }
}

What does the return value mean? I tried to find some pointers in the
documentations and source code,but none of them describe what it is
used for. Since I’m modifying the state of a key using state.remove()
and state.update(),why would I have to do the same with return
values?

返回值是一种沿着spark图传递中间状态的方法.例如,假设我想更新我的状态,但也在我的管道中使用中间数据执行某些操作,例如:

dStream
  .mapWithState(stateSpec)
  .map(optionIntermediateResult.map(_ * 2))
  .foreachRDD( /* other stuff */)

该返回值正是允许我继续操作所述数据的原因.如果您不关心中间结果并且只想要完整状态,则输出None是完全正常的.

编辑:

我写了一个blog post(跟随这个问题),试图对API进行深入的解释.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读