加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

如何在Scala中获取Spark RDD的SQL row_number值?

发布时间:2020-12-16 09:21:57 所属栏目:安全 来源:网络整理
导读:我需要为许多列的数据表生成一个row_numbers的完整列表. 在SQL中,这将如下所示: select key_value,col1,col2,col3,row_number() over (partition by key_value order by col1,col2 desc,col3)from temp; 现在,我们在Spark中有一个RDD(K,V),其中V =(col1,col
我需要为许多列的数据表生成一个row_numbers的完整列表.

在SQL中,这将如下所示:

select
   key_value,col1,col2,col3,row_number() over (partition by key_value order by col1,col2 desc,col3)
from
   temp
;

现在,我们在Spark中有一个RDD(K,V),其中V =(col1,col3),所以我的条目就像

(key1,(1,2,3))
(key1,4,7))
(key1,(2,3))
(key2,(5,5,5))
(key2,9))
(key2,(7,5))
etc.

我想使用诸如sortBy(),sortWith(),sortByKey(),zipWithIndex等命令来命令这些命令,并且具有正确的row_number的新RDD

(key1,3),2)
(key1,7),1)
(key1,3)
(key2,5),1)
(key2,9),2)
(key2,3)
etc.

(我不在意括号,所以表单也可以是(K,(col1,rownum)))

我该如何做?

这是我的第一个尝试:

val sample_data = Seq(((3,4),((3,7,((1,2),1,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,5)
// ((3,9)
// ((3,5)
// ((1,3)
// ((1,7)
// ((1,3)

temp1.map(x => (x,1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,1),0)
// ((((1,1)
// ((((1,2)
// ((((3,3)
// ((((3,4)
// ((((3,5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

另请注意,函数sortBy不能直接应用于RDD,但是必须首先运行collect(),然后输出不是RDD,而是数组

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,3)
// ((3,5)

这里有一点进步,但仍未分区:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2,a._3,a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1,a._1._2._1,a._1._2._2,a._1._2._3,a._2 + 1))

temp2.collect().foreach(println)

// ((1,1)
// ((1,3,2)
// ((1,4)
// ((3,9,6)

解决方法

Spark_node()over(partition by … order by …)功能被添加到Spark 1.4中.这个答案使用PySpark / DataFrames.

创建一个测试DataFrame:

from pyspark.sql import Row,functions as F

testDF = sc.parallelize(
    (Row(k="key1",v=(1,3)),Row(k="key1",7)),v=(2,Row(k="key2",v=(5,5)),9)),v=(7,5))
    )
).toDF()

添加分区行号:

from pyspark.sql.window import Window

(testDF
 .select("k","v",F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,3]|     1|
|key1|[1,7]|     2|
|key1|[2,3]|     3|
|key2|[5,5]|     1|
|key2|[5,9]|     2|
|key2|[7,5]|     3|
+----+-------+------+

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读