加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 创建RDD以收集迭代计算的结果

发布时间:2020-12-16 18:56:34 所属栏目:安全 来源:网络整理
导读:我想创建一个RDD来收集迭代计算的结果. 如何使用循环(或任何替代方法)替换以下代码: import org.apache.spark.mllib.random.RandomRDDs._ val n = 10 val step1 = normalRDD(sc,n,seed = 1 ) val step2 = normalRDD(sc,seed = (step1.max).toLong ) val res
我想创建一个RDD来收集迭代计算的结果.

如何使用循环(或任何替代方法)替换以下代码:

import org.apache.spark.mllib.random.RandomRDDs._    

val n = 10 

val step1 = normalRDD(sc,n,seed = 1 ) 
val step2 = normalRDD(sc,seed = (step1.max).toLong ) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc,seed = (step2.max).toLong ) 
val result2 = result1.zip(step3) 

...

val step50 = normalRDD(sc,seed = (step49.max).toLong ) 
val result49 = result48.zip(step50)

(创建N步RDD并在最后一起压缩也可以,只要迭代创建50个RDD以遵守种子=(步骤(n-1).max)条件)

解决方法

递归函数可以工作:

/**
 * The return type is an Option to handle the case of a user specifying
 * a non positive number of steps.
 */
def createZippedNormal(sc : SparkContext,numPartitions : Int,numSteps : Int) : Option[RDD[Double]] = {

  @scala.annotation.tailrec   
  def accum(sc : SparkContext,numSteps : Int,currRDD : RDD[Double],seed : Long) : RDD[Double] = {
    if(numSteps <= 0) currRDD
    else {
      val newRDD = normalRDD(sc,numPartitions,seed) 
      accum(sc,numSteps - 1,currRDD.zip(newRDD),newRDD.max)
    }
  }

  if(numSteps <= 0) None
  else Some(accum(sc,numSteps,sc.emptyRDD[Double],1L))
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读