加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark流示例使用其他参数调用updateStateByKey

发布时间:2020-12-16 18:39:17 所属栏目:安全 来源:网络整理
导读:想知道为什么StatefulNetworkWordCount. scala示例调用臭名昭着的updateStateByKey()函数,该函数应该仅将函数作为参数使用: val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,new HashPartitioner (ssc.sparkContext.defaultParallelis
想知道为什么StatefulNetworkWordCount. scala示例调用臭名昭着的updateStateByKey()函数,该函数应该仅将函数作为参数使用:

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,new HashPartitioner (ssc.sparkContext.defaultParallelism),true,initialRDD)

为什么需要(以及如何处理 – 这不是在updateStateByKey()的签名中?)传递分区器,布尔值和RDD?

谢谢,
马特

解决方法

这是因为:

>您可以看到不同的Spark版本分支:https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala.在Spark 1.2中,此代码只使用updateStateByKey接收单个函数作为参数,而在1.3中它们已经对它进行了优化
> 1.2和1.3中都存在不同版本的updateStateByKey.但是在1.2中没有带有4个参数的版本,它仅在1.3:https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala中引入

这是代码:

/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note,that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K,Seq[V],Option[S])]) => Iterator[(K,S)],partitioner: Partitioner,rememberPartitioner: Boolean,initialRDD: RDD[(K,S)]
): DStream[(K,S)] = {
    new StateDStream(self,ssc.sc.clean(updateFunc),partitioner,rememberPartitioner,Some(initialRDD))
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读