scala – Spark mapWithState API说明
我一直在Spark Streaming中使用mapWithState API,但有两件事情并不清楚StateSpec.function:
假设我的功能是: def trackStateForKey(batchTime: Time,key: Long,newValue: Option[JobData],currentState: State[JobData]): Option[(Long,JobData)] >为什么新值是Option [T]类型?据我所知,它总是为我定义,并且由于该方法应该被调用一个新的状态,我真的没有看到为什么它可以是可选的. 在我当前的实现中,如果我删除密钥则返回None,如果我更新它则返回Some(newState),但我不确定这是否正确. 解决方法
这是一个Option [T],因为如果使用StateSpec.timeout设置超时,例如: StateSpec.function(spec _).timeout(Milliseconds(5000)) 然后,一旦函数超时,传入的值将为None,State [T]上的isTimingOut方法将产生true.这是有道理的,因为状态的超时并不意味着已经为指定的键到达了一个新值,并且通常比使用T传递null更加安全(无论如何都不能用于原语),因为您期望用户安全地操作选项[T]. 您可以在Sparks实现中看到: // Get the timed out state records,call the mapping function on each and collect the // data returned if (removeTimedoutData && timeoutThresholdTime.isDefined) { newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key,state,_) => wrappedState.wrapTimingOutState(state) val returned = mappingFunction(batchTime,key,None,wrappedState) // <-- This. mappedData ++= returned newStateMap.remove(key) } }
返回值是一种沿着spark图传递中间状态的方法.例如,假设我想更新我的状态,但也在我的管道中使用中间数据执行某些操作,例如: dStream .mapWithState(stateSpec) .map(optionIntermediateResult.map(_ * 2)) .foreachRDD( /* other stuff */) 该返回值正是允许我继续操作所述数据的原因.如果您不关心中间结果并且只想要完整状态,则输出None是完全正常的. 编辑: 我写了一个blog post(跟随这个问题),试图对API进行深入的解释. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |