scala – 在Spark SQL中聚合大型数据集
请考虑以下代码:
case class Person( personId: Long,name: String,ageGroup: String,gender: String,relationshipStatus: String,country: String,state: String ) case class PerPersonPower(personId: Long,power: Double) val people: Dataset[Person] = ... // Around 50 million entries. val powers: Dataset[PerPersonPower] = ... // Around 50 million entries. people.join(powers,"personId") .groupBy("ageGroup","gender","relationshipStatus","country","state") .agg( sum("power").alias("totalPower"),count("*").alias("personCount") ) 它在具有大约100 GB RAM的群集上执行.但是,群集内存不足.我不知道该怎么做.实际上,人们被$“personId”分区并缓存 – people.repartition($“personId”).cache(). 我有什么想法可以优化这个计算? 该集群是一个vanilla Google Dataproc集群—因此它在客户端模式下使用YARN–由14个节点组成,每个节点具有8 GB RAM. 解决方法
根据请求中提供的有限信息,我建议不要使用缓存并创建比默认数量更多的分区(通常为200,但可能因群集而异) – 尝试在应用程序中设置spark.shuffle.partitions 1000或2000开始.它可以像spark.conf.set(‘spark.shuffle.partitions’,1000)那样完成.很可能你的查询命中SortMergeJoin,当前执行程序获取更多的数据,它的堆减去YARN开销.请查阅您的
SparkUI for the cluster以监控和优化您的查询执行.在SQL选项卡中,您将看到关于每个阶段中正在处理的数据量的非常详细的数字,因此您将识别瓶颈并更快地修复它们.
Spark查询计划程序首先会按照spark.shuffle.partitions中定义的数字对personId对PerPersonPower和Person进行排序,将其刷新到HDFS到spark.shuffle.partitions单独的镶木地板文件中,然后创建相同数量的部分聚合并将其添加到结果数据框中. 您似乎加入了大约18-20GB(人)的数据,大约800MB(功率).如果功率稍微小一点,你可以尝试像人一样使用BroadcastHashJoin.join(广播(权力),“personId”),虽然我不建议广播大于128Mb或256Mb的数据帧. 祝好运! (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |