加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

如何在Scala Spark中排序RDD?

发布时间:2020-12-16 09:46:27 所属栏目:安全 来源:网络整理
导读:阅读Spark方法sortByKey: sortByKey([ascending],[numTasks]) When called on a dataset of (K,V) pairs where K implements Ordered,returns a dataset of (K,V) pairs sorted by keys in ascending or descending order,as specified in the boolean asce
阅读Spark方法sortByKey:

sortByKey([ascending],[numTasks])   When called on a dataset of (K,V) pairs where K implements Ordered,returns a dataset of (K,V) pairs sorted by keys in ascending or descending order,as specified in the boolean ascending argument.

有可能只返回“N”个结果。所以不是返回所有的结果,只返回前10个。我可以将排序的集合转换为一个Array并使用take方法,但是由于这是一个O(N)操作是否有更有效的方法?

解决方法

很可能你已经熟悉了源代码:

class OrderedRDDFunctions {
   // <snip>
  def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size): RDD[P] = {
    val part = new RangePartitioner(numPartitions,self,ascending)
    val shuffled = new ShuffledRDD[K,V,P](self,part)
    shuffled.mapPartitions(iter => {
      val buf = iter.toArray
      if (ascending) {
        buf.sortWith((x,y) => x._1 < y._1).iterator
      } else {
        buf.sortWith((x,y) => x._1 > y._1).iterator
      }
    },preservesPartitioning = true)
  }

而且,正如你所说,整个数据必须经过shuffle阶段,如片段所示。

但是,您对随后调用Take(K)的关注可能不太准确。此操作不会循环通过所有N个项目:

/**
   * Take the first num elements of the RDD. It works by first scanning one partition,and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   */
  def take(num: Int): Array[T] = {

那么,它似乎是:

O(myRdd.take(K)) << O(myRdd.sortByKey()) ~= O(myRdd.sortByKey.take(k)) (at least for small K) << O(myRdd.sortByKey().collect()

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读