加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用带有Trigger.Once的Spark结构化流

发布时间:2020-12-16 08:52:42 所属栏目:安全 来源:网络整理
导读:有一个CSV文件的数据湖,全天更新.我正在尝试使用Trigger.Once功能 outlined in this blog post创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据. 这就是我所拥有的: val df = spark .readStream .schema(s) .csv("s3a://csv-data-l
有一个CSV文件的数据湖,全天更新.我正在尝试使用Trigger.Once功能 outlined in this blog post创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.

这就是我所拥有的:

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")

以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业).

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation","s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation","s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()

以下命令在写入任何数据之前停止查询.

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation","s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()

如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止?

解决方法

结构化流的主要目的是连续处理数据,而无需在新数据到达时启动/停止流.阅读 this了解更多详情.

从Spark 2.0.0开始,StreamingQuery具有方法processAllAvailable,它等待处理所有源数据并将其提交到接收器.请注意,scala docs states to use this method for testing purpose only.

因此代码应如下所示(如果您仍然需要它):

query.processAllAvailable
query.stop

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读