如何在Scala中获取Spark RDD的SQL row_number值?
发布时间:2020-12-16 09:21:57 所属栏目:安全 来源:网络整理
导读:我需要为许多列的数据表生成一个row_numbers的完整列表. 在SQL中,这将如下所示: select key_value,col1,col2,col3,row_number() over (partition by key_value order by col1,col2 desc,col3)from temp; 现在,我们在Spark中有一个RDD(K,V),其中V =(col1,col
我需要为许多列的数据表生成一个row_numbers的完整列表.
在SQL中,这将如下所示: select key_value,col1,col2,col3,row_number() over (partition by key_value order by col1,col2 desc,col3) from temp ; 现在,我们在Spark中有一个RDD(K,V),其中V =(col1,col3),所以我的条目就像 (key1,(1,2,3)) (key1,4,7)) (key1,(2,3)) (key2,(5,5,5)) (key2,9)) (key2,(7,5)) etc. 我想使用诸如sortBy(),sortWith(),sortByKey(),zipWithIndex等命令来命令这些命令,并且具有正确的row_number的新RDD (key1,3),2) (key1,7),1) (key1,3) (key2,5),1) (key2,9),2) (key2,3) etc. (我不在意括号,所以表单也可以是(K,(col1,rownum))) 我该如何做? 这是我的第一个尝试: val sample_data = Seq(((3,4),((3,7,((1,2),1,3)) val temp1 = sc.parallelize(sample_data) temp1.collect().foreach(println) // ((3,5) // ((3,9) // ((3,5) // ((1,3) // ((1,7) // ((1,3) temp1.map(x => (x,1)).sortByKey().zipWithIndex.collect().foreach(println) // ((((1,1),0) // ((((1,1) // ((((1,2) // ((((3,3) // ((((3,4) // ((((3,5) // note that this isn't ordering with a partition on key value K! val temp2 = temp1.??? 另请注意,函数sortBy不能直接应用于RDD,但是必须首先运行collect(),然后输出不是RDD,而是数组 temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) // ((1,3) // ((3,5) 这里有一点进步,但仍未分区: val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2,a._3,a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1,a._1._2._1,a._1._2._2,a._1._2._3,a._2 + 1)) temp2.collect().foreach(println) // ((1,1) // ((1,3,2) // ((1,4) // ((3,9,6) 解决方法
Spark_node()over(partition by … order by …)功能被添加到Spark 1.4中.这个答案使用PySpark / DataFrames.
创建一个测试DataFrame: from pyspark.sql import Row,functions as F testDF = sc.parallelize( (Row(k="key1",v=(1,3)),Row(k="key1",7)),v=(2,Row(k="key2",v=(5,5)),9)),v=(7,5)) ) ).toDF() 添加分区行号: from pyspark.sql.window import Window (testDF .select("k","v",F.rowNumber() .over(Window .partitionBy("k") .orderBy("k") ) .alias("rowNum") ) .show() ) +----+-------+------+ | k| v|rowNum| +----+-------+------+ |key1|[1,3]| 1| |key1|[1,7]| 2| |key1|[2,3]| 3| |key2|[5,5]| 1| |key2|[5,9]| 2| |key2|[7,5]| 3| +----+-------+------+ (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |