如何在Scala Spark中排序RDD?
发布时间:2020-12-16 09:46:27 所属栏目:安全 来源:网络整理
导读:阅读Spark方法sortByKey: sortByKey([ascending],[numTasks]) When called on a dataset of (K,V) pairs where K implements Ordered,returns a dataset of (K,V) pairs sorted by keys in ascending or descending order,as specified in the boolean asce
阅读Spark方法sortByKey:
sortByKey([ascending],[numTasks]) When called on a dataset of (K,V) pairs where K implements Ordered,returns a dataset of (K,V) pairs sorted by keys in ascending or descending order,as specified in the boolean ascending argument. 有可能只返回“N”个结果。所以不是返回所有的结果,只返回前10个。我可以将排序的集合转换为一个Array并使用take方法,但是由于这是一个O(N)操作是否有更有效的方法? 解决方法
很可能你已经熟悉了源代码:
class OrderedRDDFunctions { // <snip> def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size): RDD[P] = { val part = new RangePartitioner(numPartitions,self,ascending) val shuffled = new ShuffledRDD[K,V,P](self,part) shuffled.mapPartitions(iter => { val buf = iter.toArray if (ascending) { buf.sortWith((x,y) => x._1 < y._1).iterator } else { buf.sortWith((x,y) => x._1 > y._1).iterator } },preservesPartitioning = true) } 而且,正如你所说,整个数据必须经过shuffle阶段,如片段所示。 但是,您对随后调用Take(K)的关注可能不太准确。此操作不会循环通过所有N个项目: /** * Take the first num elements of the RDD. It works by first scanning one partition,and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. */ def take(num: Int): Array[T] = { 那么,它似乎是:
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |