scala – 使用DataFrame API,自联接无法正常工作
发布时间:2020-12-16 09:58:56 所属栏目:安全 来源:网络整理
导读:我试图从使用自联接的表中获取最新记录.它使用spark-sql但无法使用spark DataFrame API. 有人可以帮忙吗?这是一个错误吗? 我在本地模式下使用Spark 2.2.0 创建输入DataFrame: scala val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2)
我试图从使用自联接的表中获取最新记录.它使用spark-sql但无法使用spark DataFrame API.
有人可以帮忙吗?这是一个错误吗? 我在本地模式下使用Spark 2.2.0 创建输入DataFrame: scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b","bb",5))).toDF("id","value","time") df3: org.apache.spark.sql.DataFrame = [id: int,value: string ... 1 more field] scala> val df33 = df3 df33: org.apache.spark.sql.DataFrame = [id: int,value: string ... 1 more field] scala> df3.show +---+-----+----+ | id|value|time| +---+-----+----+ | 1| a| 1| | 1| aa| 2| | 2| b| 2| | 2| bb| 5| +---+-----+----+ scala> df33.show +---+-----+----+ | id|value|time| +---+-----+----+ | 1| a| 1| | 1| aa| 2| | 2| b| 2| | 2| bb| 5| +---+-----+----+ 现在使用SQL执行连接:有效 scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show +---+-----+----+ | id|value|time| +---+-----+----+ | 1| aa| 2| | 2| bb| 5| +---+-----+----+ 现在使用dataframe API执行连接:不起作用 scala> df3.join(df33,(df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show +---+-----+----+ | id|value|time| +---+-----+----+ +---+-----+----+ 需要注意的是解释计划:DataFrame API为空! scala> df3.join(df33,df33.col("time")).explain == Physical Plan == LocalTableScan <empty>,[id#150,value#151,time#152] scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain == Physical Plan == *Project [id#1241,value#1242,time#1243] +- *SortMergeJoin [id#150],[id#1241],Inner,(time#152 < time#1243) :- *Sort [id#150 ASC NULLS FIRST],false,0 : +- Exchange hashpartitioning(id#150,200) : +- *Project [_1#146 AS id#150,_3#148 AS time#152] : +- *SerializeFromObject [assertnotnull(input[0,scala.Tuple3,true])._1 AS _1#146,staticinvoke(class org.apache.spark.unsafe.types.UTF8String,StringType,fromString,assertnotnull(input[0,true])._2,true) AS _2#147,true])._3 AS _3#148] : +- Scan ExternalRDDScan[obj#145] +- *Sort [id#1241 ASC NULLS FIRST],0 +- Exchange hashpartitioning(id#1241,200) +- *Project [_1#146 AS id#1241,_2#147 AS value#1242,_3#148 AS time#1243] +- *SerializeFromObject [assertnotnull(input[0,true])._3 AS _3#148] +- Scan ExternalRDDScan[obj#145] 解决方法
不,这不是一个错误,但是当你像你所做的那样将DataFrame重新分配给一个新的时,它实际上复制了谱系,但它没有复制数据.因此,您将在同一列上进行比较.
使用spark.sql略有不同,因为它实际上正在处理您的DataFrames的别名 因此,使用API??执行自联接的正确方法实际上是对DataFrame进行别名,如下所示: val df1 = Seq((1,5)).toDF("id","time") df1.as("df1").join(df1.as("df2"),$"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show // +---+-----+----+ // | id|value|time| // +---+-----+----+ // | 1| aa| 2| // | 2| bb| 5| // +---+-----+----+ 有关自联接的更多信息,我建议阅读High Performance Spark by Rachel Warren,Holden Karau – Chapter 4. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |