scala – 如何使用`ssc.fileStream()`读取镶木地板文件?传递给`
我对Spark的fileStream()方法的理解是它需要三种类型的参数:Key,Value和Format.对于文本文件,适当的类型是:LongWritable,Text和TextInputFormat.
首先,我想了解这些类型的本质.直观地说,我猜这个案例中的Key是文件的行号,Value是该行的文本.因此,在以下文本文件示例中: Hello Test Another Test DStream的第一行的Key为1(0?),值为Hello. 它是否正确? 我的问题的第二部分:我查看了ParquetInputFormat的反编译实现,我发现了一些好奇的东西: public class ParquetInputFormat<T> extends FileInputFormat<Void,T> { //... public class TextInputFormat extends FileInputFormat<LongWritable,Text> implements JobConfigurable { //... TextInputFormat扩展了LongWritable和Text类型的FileInputFormat,而ParquetInputFormat扩展了Void和T类型的相同类. 这是否意味着我必须创建一个Value类来保存我的镶木地板数据的整行,然后传递类型< Void,MyClass,ParquetInputFormat< MyClass>>到ssc.fileStream()? 如果是这样,我该如何实现MyClass? 编辑1:我注意到了一个传递给ParquetInputFormat对象的readSupportClass.这是什么类,它是如何用于解析镶木地板文件的?是否有一些文件涵盖了这一点? 编辑2:据我所知,这是不可能的.如果有人知道如何将镶木地板文件流式传输到Spark,请随时分享… 解决方法
我在Spark Streaming中读取镶木地板文件的示例如下.
val ssc = new StreamingContext(sparkConf,Seconds(2)) ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class","parquet.avro.AvroReadSupport") val stream = ssc.fileStream[Void,GenericRecord,ParquetInputFormat[GenericRecord]]( directory,{ path: Path => path.toString.endsWith("parquet") },true,ssc.sparkContext.hadoopConfiguration) val lines = stream.map(row => { println("row:" + row.toString()) row }) 有些观点是…… >记录类型是GenericRecord 我在下面提到了创建样本的源代码. https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |