加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 更新Delta Data的Spark Dataframe的窗口函数row_number

发布时间:2020-12-16 18:13:34 所属栏目:安全 来源:网络整理
导读:我需要更新delta数据的数据帧的行号列.我已经实现了基本加载的行号,如下所示: 输入数据: val base = List(List("001","a","abc"),List("001","123"),List("003","c","456"),List("002","b","dfr"),"ytr")) .map(row = (row(0),row(1),row(2)))val DS1 = ba
我需要更新delta数据的数据帧的行号列.我已经实现了基本加载的行号,如下所示:

输入数据:

val base = List(List("001","a","abc"),List("001","123"),List("003","c","456"),List("002","b","dfr"),"ytr"))
  .map(row => (row(0),row(1),row(2)))

val DS1 = base.toDF("KEY1","KEY2","VAL")

DS1.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|abc|
| 001|   a|123|
| 003|   c|456|
| 002|   b|dfr|
| 003|   c|ytr|
+----+----+---+

现在我使用窗口函数添加了行号,如下所示:

val baseDF =  DS1.select(col("KEY1"),col("KEY2"),col("VAL"),row_number().over(Window.partitionBy(col("KEY1"),col("KEY2")).orderBy(col("KEY1"),col("KEY2").asc)).alias("Row_Num"))
baseDF.show()

+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
|002 |b   |dfr|1      |
|003 |c   |456|1      |
|003 |c   |ytr|2      |
+----+----+---+-------+

现在delta负载如下:

val delta = List(List("001","y45"),"444"))
  .map(row => (row(0),row(2)))

val DS2 = delta.toDF("KEY1","VAL")
DS2.show()

+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001|   a|y45|
| 002|   b|444|
+----+----+---+

所以预期的更新结果应该是:

baseDF.show()

|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a   |abc|1      |
|001 |a   |123|2      |
| 001|   a|y45|3      | -----> Delta record
|002 |b   |dfr|1      |
| 002|   b|444|2      | -----> Delta record 
|003 |c   |456|1      |
|003 |c   |ytr|2      |
+----+----+---+-------+

有关使用数据框/数据集实现此解决方案的任何建议吗?
我们能用spark rdd的zipWithIndex实现上述解决方案吗?

解决方法

添加带有更新行号的增量的一种方法是:1)在DS2中添加一个带有大数字的列Row_Num,2)将union baseFF与它一起添加,以及3)计算新的行号,如下所示:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val combinedDF = baseDF.union(
  DS2.withColumn("Row_Num",lit(Long.MaxValue))
)

val resultDF = combinedDF.select(
  col("KEY1"),row_number().over(
    Window.partitionBy(col("KEY1"),col("KEY2")).orderBy(col("Row_Num"))
  ).alias("New_Row_Num")
)

resultDF.show
+----+----+---+-----------+
|KEY1|KEY2|VAL|New_Row_Num|
+----+----+---+-----------+
| 003|   c|456|          1|
| 003|   c|ytr|          2|
| 002|   b|dfr|          1|
| 002|   b|444|          2|
| 001|   a|abc|          1|
| 001|   a|123|          2|
| 001|   a|y45|          3|
+----+----+---+-----------+

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读