加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 无界表是火花结构流

发布时间:2020-12-16 18:09:34 所属栏目:安全 来源:网络整理
导读:我开始学习Spark,并且很难理解Spark中Structured Streaming背后的合理性.结构化流处理作为无界输入表到达的所有数据,其中数据流中的每个新项被视为表中的新行.我有以下代码来读取传入文件到csvFolder. val spark = SparkSession.builder.appName("SimpleApp"
我开始学习Spark,并且很难理解Spark中Structured Streaming背后的合理性.结构化流处理作为无界输入表到达的所有数据,其中数据流中的每个新项被视为表中的新行.我有以下代码来读取传入文件到csvFolder.

val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

val csvSchema = new StructType().add("street","string").add("city","string")
.add("zip","string").add("state","string").add("beds","string")
.add("baths","string").add("sq__ft","string").add("type","string")
.add("sale_date","string").add("price","string").add("latitude","string")
.add("longitude","string")

val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")

val query = streamingDF.writeStream
  .format("console")
  .start()

如果我将1GB文件转储到该文件夹??会发生什么.根据规范,流媒体作业每隔几毫秒触发一次.如果Spark在下一个瞬间遇到如此庞大的文件,在尝试加载文件时不会耗尽内存.还是自动批量处理?如果是,该批处理参数是否可配置?

解决方法

见 example

关键思想是将任何数据流视为无界表:添加到流中的新记录就像添加到表中的行一样.

enter image description here


这允许我们将批处理和流数据视为表.由于表和DataFrames / Datasets在语义上是同义的,因此可以对批处理和流数据应用相同的类似批处理的DataFrame / Dataset查询.

在结构化流模型中,这是执行此查询的方式.

enter image description here

Question : If Spark encounters such a huge file in the next instant,won’t it run out of memory while trying to load the file. Or does it automatically
batch it? If yes,is this batching parameter configurable?

Answer : There is no point of OOM since it is RDD(DF/DS)lazily initialized. of course you need to re-partition before processing to ensure equal number of partitions and data spread across executors uniformly…

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读