加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用DataFrame API,自联接无法正常工作

发布时间:2020-12-16 09:58:56 所属栏目:安全 来源:网络整理
导读:我试图从使用自联接的表中获取最新记录.它使用spark-sql但无法使用spark DataFrame API. 有人可以帮忙吗?这是一个错误吗? 我在本地模式下使用Spark 2.2.0 创建输入DataFrame: scala val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2)
我试图从使用自联接的表中获取最新记录.它使用spark-sql但无法使用spark DataFrame API.

有人可以帮忙吗?这是一个错误吗?

我在本地模式下使用Spark 2.2.0

创建输入DataFrame:

scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b","bb",5))).toDF("id","value","time")
df3: org.apache.spark.sql.DataFrame = [id: int,value: string ... 1 more field]    

scala> val df33 = df3
df33: org.apache.spark.sql.DataFrame = [id: int,value: string ... 1 more field]

scala> df3.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

scala> df33.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

现在使用SQL执行连接:有效

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|   aa|   2|
|  2|   bb|   5|
+---+-----+----+

现在使用dataframe API执行连接:不起作用

scala> df3.join(df33,(df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show
+---+-----+----+
| id|value|time|
+---+-----+----+
+---+-----+----+

需要注意的是解释计划:DataFrame API为空!

scala> df3.join(df33,df33.col("time")).explain
== Physical Plan ==
LocalTableScan <empty>,[id#150,value#151,time#152]

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain
== Physical Plan ==
*Project [id#1241,value#1242,time#1243]
+- *SortMergeJoin [id#150],[id#1241],Inner,(time#152 < time#1243)
   :- *Sort [id#150 ASC NULLS FIRST],false,0
   :  +- Exchange hashpartitioning(id#150,200)
   :     +- *Project [_1#146 AS id#150,_3#148 AS time#152]
   :        +- *SerializeFromObject [assertnotnull(input[0,scala.Tuple3,true])._1 AS _1#146,staticinvoke(class org.apache.spark.unsafe.types.UTF8String,StringType,fromString,assertnotnull(input[0,true])._2,true) AS _2#147,true])._3 AS _3#148]
   :           +- Scan ExternalRDDScan[obj#145]
   +- *Sort [id#1241 ASC NULLS FIRST],0
      +- Exchange hashpartitioning(id#1241,200)
         +- *Project [_1#146 AS id#1241,_2#147 AS value#1242,_3#148 AS time#1243]
            +- *SerializeFromObject [assertnotnull(input[0,true])._3 AS _3#148]
               +- Scan ExternalRDDScan[obj#145]

解决方法

不,这不是一个错误,但是当你像你所做的那样将DataFrame重新分配给一个新的时,它实际上复制了谱系,但它没有复制数据.因此,您将在同一列上进行比较.

使用spark.sql略有不同,因为它实际上正在处理您的DataFrames的别名

因此,使用API??执行自联接的正确方法实际上是对DataFrame进行别名,如下所示:

val df1 = Seq((1,5)).toDF("id","time")

df1.as("df1").join(df1.as("df2"),$"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show
// +---+-----+----+
// | id|value|time|
// +---+-----+----+
// |  1|   aa|   2|
// |  2|   bb|   5|
// +---+-----+----+

有关自联接的更多信息,我建议阅读High Performance Spark by Rachel Warren,Holden Karau – Chapter 4.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读