加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 有没有办法重写Spark RDD distinct以使用mapPartitions

发布时间:2020-12-16 18:49:02 所属栏目:安全 来源:网络整理
导读:我有一个RDD,它太大而不能一致地执行一个不同的语句而没有虚假错误(例如,SparkException阶段失败4次,ExecutorLostFailure,HDFS文件系统关闭,最大执行器失败次数,阶段因SparkContext关闭而被取消,等等) 我试图计算特定列中的不同ID,例如: print(myRDD.map(a
我有一个RDD,它太大而不能一致地执行一个不同的语句而没有虚假错误(例如,SparkException阶段失败4次,ExecutorLostFailure,HDFS文件系统关闭,最大执行器失败次数,阶段因SparkContext关闭而被取消,等等)

我试图计算特定列中的不同ID,例如:

print(myRDD.map(a => a._2._1._2).distinct.count())

是否有一种简单,一致,不太随机密集的方式来执行上面的命令,可能使用mapPartitions,reduceByKey,flatMap或其他使用较少shuffle而不是不同的命令?

另见What are the Spark transformations that causes a Shuffle?

解决方法

最好弄清楚是否存在另一个潜在的问题,但是下面会做你想做的事情……而不是想做到这一点,但听起来它会适合你的账单:

myRDD.map(a => (a._2._1._2,a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg,value) => agg + value,(agg1,agg2) => agg1 ++ agg2) 
  .keys
  .count

或者甚至这似乎有效,但它不是联想和可交换的.它起作用的原因是Spark的内部工作原理……但我可能会错过一个案例……所以虽然更简单,但我不确定我是否相信它:

myRDD.map(a => (a._2._1._2,a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y,(x,y)=>x)
  .keys.count

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读