scala – 尽管重新分区,但只能在每个分区中压缩具有相同数量元素
我加载了一个数据集
val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp) 我想在这个数据上放一个索引 val nb = data.count.toInt val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions) val res = tozip.zip(data) 不幸的是我有以下错误 Can only zip RDDs with same number of elements in each partition 如果可能的话,我怎么能按分区修改元素的数量? 解决方法
为什么它不起作用?
zip()的文档指出:
所以我们需要确保我们满足两个条件: >两个RDD具有相同数量的分区 您确保使用repartition()具有相同数量的分区但Spark并不保证您在每个RDD的每个分区中具有相同的分布. 这是为什么? 因为有不同类型的RDD,并且大多数都有不同的分区策略!例如: >使用sc.parallelize(集合)并行化集合时会创建ParallelCollectionRDD,它将查看应该有多少分区,将检查集合的大小并计算步长.即你在列表中有15个元素,想要4个分区,前3个将有4个连续元素,最后一个将剩下3个. 在你的例子中,Spark内部确实在进行重新分区时创建了不同类型的RDD(CoalescedRDD和ShuffledRDD),但我认为你得到了不同RDD具有不同分区策略的全局观点:-) 请注意,zip()doc的最后一部分提到了map()操作.此操作不会重新分区,因为它是一个狭窄的转换数据,因此它可以保证两种条件. 解 在这个简单的例子中,你可以简单地做data.zipWithIndex.如果你需要更复杂的东西,那么应该使用map()创建zip()的新RDD,如上所述. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |