加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 在Spark SQL中聚合大型数据集

发布时间:2020-12-16 18:37:23 所属栏目:安全 来源:网络整理
导读:请考虑以下代码: case class Person( personId: Long,name: String,ageGroup: String,gender: String,relationshipStatus: String,country: String,state: String)case class PerPersonPower(personId: Long,power: Double)val people: Dataset[Person] = .
请考虑以下代码:

case class Person(
  personId: Long,name: String,ageGroup: String,gender: String,relationshipStatus: String,country: String,state: String
)

case class PerPersonPower(personId: Long,power: Double)

val people: Dataset[Person] = ...          // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ...  // Around 50 million entries.

people.join(powers,"personId")
  .groupBy("ageGroup","gender","relationshipStatus","country","state")
  .agg(
    sum("power").alias("totalPower"),count("*").alias("personCount")
  )

它在具有大约100 GB RAM的群集上执行.但是,群集内存不足.我不知道该怎么做.实际上,人们被$“personId”分区并缓存 – people.repartition($“personId”).cache().

我有什么想法可以优化这个计算?

该集群是一个vanilla Google Dataproc集群—因此它在客户端模式下使用YARN–由14个节点组成,每个节点具有8 GB RAM.

解决方法

根据请求中提供的有限信息,我建议不要使用缓存并创建比默认数量更多的分区(通常为200,但可能因群集而异) – 尝试在应用程序中设置spark.shuffle.partitions 1000或2000开始.它可以像spark.conf.set(‘spark.shuffle.partitions’,1000)那样完成.很可能你的查询命中SortMergeJoin,当前执行程序获取更多的数据,它的堆减去YARN开销.请查阅您的 SparkUI for the cluster以监控和优化您的查询执行.在SQL选项卡中,您将看到关于每个阶段中正在处理的数据量的非常详细的数字,因此您将识别瓶颈并更快地修复它们.

Spark查询计划程序首先会按照spark.shuffle.partitions中定义的数字对personId对PerPersonPower和Person进行排序,将其刷新到HDFS到spark.shuffle.partitions单独的镶木地板文件中,然后创建相同数量的部分聚合并将其添加到结果数据框中.

您似乎加入了大约18-20GB(人)的数据,大约800MB(功率).如果功率稍微小一点,你可以尝试像人一样使用BroadcastHashJoin.join(广播(权力),“personId”),虽然我不建议广播大于128Mb或256Mb的数据帧.

祝好运!

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读