scala – 如何在Spark 2.3.0中自行加入?什么是正确的语法?
发布时间:2020-12-16 18:09:55 所属栏目:安全 来源:网络整理
导读:我有以下代码 import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","join_test").option("startingOffsets","earliest").load(); jdf.creat
我有以下代码
import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","join_test").option("startingOffsets","earliest").load(); jdf.createOrReplaceTempView("table") val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset") resultdf.writeStream.outputMode("append").format("console").option("truncate",false).trigger(Trigger.ProcessingTime(1000)).start() 我得到以下异常 org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value,x.offset,x.key,x.timestampType,x.topic,x.timestamp,x.partition]; line 1 pos 50; 'Project [*] +- 'Join Inner,('x.offset = 'y.offset) :- SubqueryAlias x : +- SubqueryAlias table : +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,Map(startingOffsets -> earliest,subscribe -> join_test,kafka.bootstrap.servers -> localhost:9092),None),[key#28,value#29,topic#30,partition#31,offset#32L,timestamp#33,timestampType#34] +- SubqueryAlias y +- SubqueryAlias table +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,timestampType#34] 我已将代码更改为此 import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers","earliest").load(); val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","earliest").load(); jdf.createOrReplaceTempView("table") jdf1.createOrReplaceTempView("table1") val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset") resultdf.writeStream.outputMode("append").format("console").option("truncate",false).trigger(Trigger.ProcessingTime(1000)).start() 这很有效.但是,我不相信这是我正在寻找的解决方案.我希望能够使用原始SQL进行自联接,但不能像上面的代码那样制作数据帧的其他副本.还有其他方法吗? 解决方法
这是一个已知问题,将在2.4.0中修复.请参见
https://issues.apache.org/jira/browse/SPARK-23406.现在您可以避免加入相同的DataFrame对象.
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |