加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 尽管重新分区,但只能在每个分区中压缩具有相同数量元素

发布时间:2020-12-16 10:05:21 所属栏目:安全 来源:网络整理
导读:我加载了一个数据集 val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp) 我想在这个数据上放一个索引 val nb = data.count.toIntval tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)val res = tozip.zip(data) 不
我加载了一个数据集

val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)

我想在这个数据上放一个索引

val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)

val res = tozip.zip(data)

不幸的是我有以下错误

Can only zip RDDs with same number of elements in each partition

如果可能的话,我怎么能按分区修改元素的数量?

解决方法

为什么它不起作用?

zip()的文档指出:

Zips this RDD with another one,returning key-value pairs with the first element in each RDD,second element in each RDD,etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

所以我们需要确保我们满足两个条件:

>两个RDD具有相同数量的分区
>这些RDD中的相应分区具有完全相同的大小

您确保使用repartition()具有相同数量的分区但Spark并不保证您在每个RDD的每个分区中具有相同的分布.

这是为什么?

因为有不同类型的RDD,并且大多数都有不同的分区策略!例如:

>使用sc.parallelize(集合)并行化集合时会创建ParallelCollectionRDD,它将查看应该有多少分区,将检查集合的大小并计算步长.即你在列表中有15个元素,想要4个分区,前3个将有4个连续元素,最后一个将剩下3个.
> HadoopRDD如果我没记错的话,每个文件块一个分区.即使您在内部使用本地文件,当您读取本地文件然后映射该RDD时,Spark首先创建这种RDD,因为该RDD是< Long,Text>的RDD对.而你只想要字符串:-)
>等.等等.

在你的例子中,Spark内部确实在进行重新分区时创建了不同类型的RDD(CoalescedRDD和ShuffledRDD),但我认为你得到了不同RDD具有不同分区策略的全局观点:-)

请注意,zip()doc的最后一部分提到了map()操作.此操作不会重新分区,因为它是一个狭窄的转换数据,因此它可以保证两种条件.

在这个简单的例子中,你可以简单地做data.zipWithIndex.如果你需要更复杂的东西,那么应该使用map()创建zip()的新RDD,如上所述.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读