加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 有效地实现了takeByKey的火花

发布时间:2020-12-16 10:00:51 所属栏目:安全 来源:网络整理
导读:我的RDD类型为RDD [(k:Int,v:String)].我想为每个键k取最多1000个元组,这样我就有[(k,v)],其中没有键出现超过1000次.有没有办法做到这一点,我可以避免首先调用groupBy的性能损失?我无法弄清楚以一种方式聚合值的好方法,以避免导致我的作业失败的完整group
我的RDD类型为RDD [(k:Int,v:String)].我想为每个键k取最多1000个元组,这样我就有[(k,v)],其中没有键出现超过1000次.有没有办法做到这一点,我可以避免首先调用groupBy的性能损失?我无法弄清楚以一种方式聚合值的好方法,以避免导致我的作业失败的完整groupBy.

天真的方法:

def takeByKey(rdd: RDD[(K,V)],n: Int) : RDD[(K,V)] = {
    rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
}

我正在寻找一种避免groupBy的更有效的方法:

takeByKey(rdd: RDD[(K,V)] = {
    //use reduceByKey,foldByKey,etc..??
}

这是我迄今为止开发的最佳解决方案,但它不进行类型检查..

def takeByKey(rdd: RDD[(K,V)] = {
      rdd.foldByKey(List[V](),((acc,elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1,v)))
}

编辑.
我想出了一个似乎有效的解决方案:

takeByKey(rdd: RDD[(K,V)] = {
    rdd.mapValues(List(_))
       .reduceByKey((x,y) => if(x.length >= n) x 
                             else if(y.length >= n) y 
                             else (x ++ y).take(n))
       .flatMap(t => t._2.map(v => (t._1,v)))
}

解决方法

您当前的解决方案是朝着正确方向迈出的一步,但至少有三个原因它仍然非常低效:

> mapValues(List(_))创建大量临时List对象
> List的线性Seq的长度是O(N)
> x y再次创建了大量临时对象

最简单的方法是使用具有恒定时间长度的可变缓冲区替换List.一个可能的例子是这样的:

import scala.collection.mutable.ArrayBuffer

rdd.aggregateByKey(ArrayBuffer[Int]())(
  (acc,x) => if (acc.length >= n) acc else acc += x,(acc1,acc2) => {
    val (xs,ys) = if (acc1.length > acc2.length) (acc1,acc2) else (acc2,acc1)
    val toTake = Math.min(n - xs.length,ys.length)
    for (i <- 0 until toTake) {
      xs += ys(i)
    }
    xs         
  }
)

在旁注上,您可以替换:

.flatMap(t => t._2.map(v => (t._1,v)))

.flatMapValues(x => x)  // identity[Seq[V]]

它不会影响性能,但会稍微清洁一些.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读