scala – 使用带有Trigger.Once的Spark结构化流
发布时间:2020-12-16 08:52:42 所属栏目:安全 来源:网络整理
导读:有一个CSV文件的数据湖,全天更新.我正在尝试使用Trigger.Once功能 outlined in this blog post创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据. 这就是我所拥有的: val df = spark .readStream .schema(s) .csv("s3a://csv-data-l
有一个CSV文件的数据湖,全天更新.我正在尝试使用Trigger.Once功能
outlined in this blog post创建Spark结构化流工作,以定期写入已写入Parquet数据湖中CSV数据湖的新数据.
这就是我所拥有的: val df = spark .readStream .schema(s) .csv("s3a://csv-data-lake-files") 以下命令将所有数据写入Parquet湖,但在写完所有数据后没有停止(我必须手动取消作业). processedDf .writeStream .trigger(Trigger.Once) .format("parquet") .option("checkpointLocation","s3-path-to-checkpoint") .start("s3-path-to-parquet-lake") 以下工作也有效,但在写完所有数据后都没有停止(我不得不手动取消工作): val query = processedDf .writeStream .trigger(Trigger.Once) .format("parquet") .option("checkpointLocation","s3-path-to-checkpoint") .start("s3-path-to-parquet-lake") query.awaitTermination() 以下命令在写入任何数据之前停止查询. val query = processedDf .writeStream .trigger(Trigger.Once) .format("parquet") .option("checkpointLocation","s3-path-to-checkpoint") .start("s3-path-to-parquet-lake") query.stop() 如何配置writeStream查询以等待所有增量数据写入Parquet文件然后停止? 解决方法
结构化流的主要目的是连续处理数据,而无需在新数据到达时启动/停止流.阅读
this了解更多详情.
从Spark 2.0.0开始,StreamingQuery具有方法processAllAvailable,它等待处理所有源数据并将其提交到接收器.请注意,scala docs states to use this method for testing purpose only. 因此代码应如下所示(如果您仍然需要它): query.processAllAvailable query.stop (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |