加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Spark 2.3.0中自行加入?什么是正确的语法?

发布时间:2020-12-16 18:09:55 所属栏目:安全 来源:网络整理
导读:我有以下代码 import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","join_test").option("startingOffsets","earliest").load(); jdf.creat
我有以下代码

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","join_test").option("startingOffsets","earliest").load();   
jdf.createOrReplaceTempView("table")
val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")
resultdf.writeStream.outputMode("append").format("console").option("truncate",false).trigger(Trigger.ProcessingTime(1000)).start()

我得到以下异常

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value,x.offset,x.key,x.timestampType,x.topic,x.timestamp,x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner,('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,Map(startingOffsets -> earliest,subscribe -> join_test,kafka.bootstrap.servers -> localhost:9092),None),[key#28,value#29,topic#30,partition#31,offset#32L,timestamp#33,timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,timestampType#34]

我已将代码更改为此

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers","earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table1")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate",false).trigger(Trigger.ProcessingTime(1000)).start()

这很有效.但是,我不相信这是我正在寻找的解决方案.我希望能够使用原始SQL进行自联接,但不能像上面的代码那样制作数据帧的其他副本.还有其他方法吗?

解决方法

这是一个已知问题,将在2.4.0中修复.请参见 https://issues.apache.org/jira/browse/SPARK-23406.现在您可以避免加入相同的DataFrame对象.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读