spark – 如何减少JavaPairRDD的shuffle大小?
我有一个JavaPairRDD< Integer,Integer []>我想在其上执行groupByKey操作. groupByKey动作给了我一个:
如果我没有弄错的话,这实际上是一个OutOfMemory错误.这只发生在大数据集中(在我的情况下,当Web UI中显示的“Shuffle Write”为~96GB时). 我已经设定:
在$SPARK_HOME / conf / spark-defaults.conf中,但我不确定Kryo是否用于序列化我的JavaPairRDD. 除了设置这个conf参数之外,我还应该做些什么才能使用Kryo序列化我的RDD?我可以在serialization instructions中看到:
然后:
我还注意到,当我将spark.serializer设置为Kryo时,Web UI中的Shuffle Write从~96GB(默认序列化器)增加到243GB! 编辑:在评论中,我被问及我的程序的逻辑,以防groupByKey可以用reduceByKey替换.我不认为这是可能的,但无论如何它在这里: >输入具有以下形式: > key:index bucket id, > shuffle write操作以以下形式生成对: > entityId > groupByKey操作收集每个实体的所有邻居数组,其中一些可能出现多次(在许多存储桶中). 我得到的不同密钥的数量大约是1000万(大约500万个正实体ID和500万个负数). EDIT2:我尝试分别使用Hadoop的Writables(VIntWritable和VIntArrayWritable扩展ArrayWritable)而不是Integer和Integer [],但是shuffle大小仍然比默认的JavaSerializer大. 然后我将spark.shuffle.memoryFraction从0.2增加到0.4(即使在版本2.1.0中已弃用,也没有说明应该使用的内容)并启用offHeap内存,并且shuffle大小减少了~20GB.即使这符合标题所要求的内容,我更倾向于使用更算法的解决方案,或者包含更好压缩的解决方案. 最佳答案
我认为这里可以推荐的最佳方法(没有输入数据的更多具体知识)通常是在输入RDD上使用持久化API.
作为第一步,我尝试在输入上调用.persist(MEMORY_ONLY_SER),RDD以降低内存使用量(尽管在某个CPU开销下,对于你的情况来说不应该是针对整数的问题). 如果这还不够,你可以试试.persist(MEMORY_AND_DISK_SER)或者如果你的shuffle仍然占用了大量内存,那么输入数据集需要在内存上更容易.persist(DISK_ONLY)可能是一个选项,但是会强烈选择性能恶化. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |