scala – 有效地实现了takeByKey的火花
发布时间:2020-12-16 10:00:51 所属栏目:安全 来源:网络整理
导读:我的RDD类型为RDD [(k:Int,v:String)].我想为每个键k取最多1000个元组,这样我就有[(k,v)],其中没有键出现超过1000次.有没有办法做到这一点,我可以避免首先调用groupBy的性能损失?我无法弄清楚以一种方式聚合值的好方法,以避免导致我的作业失败的完整group
我的RDD类型为RDD [(k:Int,v:String)].我想为每个键k取最多1000个元组,这样我就有[(k,v)],其中没有键出现超过1000次.有没有办法做到这一点,我可以避免首先调用groupBy的性能损失?我无法弄清楚以一种方式聚合值的好方法,以避免导致我的作业失败的完整groupBy.
天真的方法: def takeByKey(rdd: RDD[(K,V)],n: Int) : RDD[(K,V)] = { rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2) } 我正在寻找一种避免groupBy的更有效的方法: takeByKey(rdd: RDD[(K,V)] = { //use reduceByKey,foldByKey,etc..?? } 这是我迄今为止开发的最佳解决方案,但它不进行类型检查.. def takeByKey(rdd: RDD[(K,V)] = { rdd.foldByKey(List[V](),((acc,elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1,v))) } 编辑. takeByKey(rdd: RDD[(K,V)] = { rdd.mapValues(List(_)) .reduceByKey((x,y) => if(x.length >= n) x else if(y.length >= n) y else (x ++ y).take(n)) .flatMap(t => t._2.map(v => (t._1,v))) } 解决方法
您当前的解决方案是朝着正确方向迈出的一步,但至少有三个原因它仍然非常低效:
> mapValues(List(_))创建大量临时List对象 最简单的方法是使用具有恒定时间长度的可变缓冲区替换List.一个可能的例子是这样的: import scala.collection.mutable.ArrayBuffer rdd.aggregateByKey(ArrayBuffer[Int]())( (acc,x) => if (acc.length >= n) acc else acc += x,(acc1,acc2) => { val (xs,ys) = if (acc1.length > acc2.length) (acc1,acc2) else (acc2,acc1) val toTake = Math.min(n - xs.length,ys.length) for (i <- 0 until toTake) { xs += ys(i) } xs } ) 在旁注上,您可以替换: .flatMap(t => t._2.map(v => (t._1,v))) 同 .flatMapValues(x => x) // identity[Seq[V]] 它不会影响性能,但会稍微清洁一些. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |