scala – 在Spark Structured Streaming中外连接两个数据集(不是
我有一些代码将两个流式DataFrames和输出连接到控制台.
val dataFrame1 = df1Input.withWatermark("timestamp","40 seconds").as("A") val dataFrame2 = df2Input.withWatermark("timestamp","40 seconds").as("B") val finalDF: DataFrame = dataFrame1.join(dataFrame2,expr( "A.id = B.id" + " AND " + "B.timestamp >= A.timestamp " + " AND " + "B.timestamp <= A.timestamp + interval 1 hour"),joinType = "leftOuter") finalDF.writeStream.format("console").start().awaitTermination() 我现在想要的是重构这部分以使用数据集,所以我可以进行一些编译时检查. 所以我尝试的非常简单: val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],joinType = "leftOuter") finalDS.writeStream.format("console").start().awaitTermination() 但是,这会产生以下错误:
如您所见,连接代码没有改变,因此两侧都有水印和范围条件.唯一的变化是使用数据集API而不是DataFrame. 另外,当我使用内连接时它很好: val finalDS: Dataset[(A,expr( "A.id = B.id" + " AND " + "B.timestamp >= A.timestamp " + " AND " + "B.timestamp <= A.timestamp + interval 1 hour") ) finalDS.writeStream.format("console").start().awaitTermination() 有谁知道这怎么可能发生? 解决方法
好吧,当你使用joinWith方法而不是join时,你依赖于不同的实现,看起来这个实现不支持leftOuter join用于流式数据集.
您可以查看官方文档的outer joins with watermarking部分.方法join不使用joinWith.请注意,结果类型将是DataFrame.这意味着您很可能必须手动映射字段 val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],expr( "A.key = B.key" + " AND " + "B.timestamp >= A.timestamp " + " AND " + "B.timestamp <= A.timestamp + interval 1 hour"),joinType = "leftOuter").select(/* useful fields */).as[C] (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- Golang Microservices无法使用Docker for Mac进行
- scala – 在groupby之后将Spark DataFrame的行聚
- 登录模板源码 : bootstrap风格
- AngularJS UI Bootstrap Typeahead:从对象到输入
- 如何在Ionic 3项目中使用angular 4路由器?
- angularjs – 使用grunt的“bower_concat”和“a
- angular-ui-grid – 如何在ui-grid中禁用多列排序
- angular select2()
- angularjs – 将对象从父控制器传递到隔离范围子
- angularjs – ngSanitize模块注入错误