加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用Spark 2.0.2(结构化流媒体)从Kafka阅读Avro消息

发布时间:2020-12-16 09:26:10 所属栏目:安全 来源:网络整理
导读:我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息. 结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它. 在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deseria
我有一个spark 2.0应用程序,它使用spark streaming(使用spark-streaming-kafka-0-10_2.11)从kafka读取消息.

结构化流看起来很酷,所以我想尝试迁移代码,但我无法弄清楚如何使用它.

在常规流媒体中,我使用kafkaUtils来创建Dstrean,在我传递的参数中是值deserializer.

在结构化流媒体中,doc说我应该使用DataFrame函数进行反序列化,但我无法确切地知道这意味着什么.

我查看了这个例子,例如这个example,但是我在Kafka的Avro对象是复杂的,不能像示例中的String那样简单地进行转换.

到目前为止,我尝试了这种代码(我在这里看到了另一个问题):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

我得到“数据类型不匹配:无法将BinaryType转换为StructType(StructField(….”

我怎样才能反序化价值?

解决方法

我还不太熟悉Spark的序列化如何与新的/实验性结构化流媒体结合使用,但下面的方法确实有效 – 虽然我不确定它是否是最好的方法(恕我直言,这种方法有点尴尬的样子’n感觉).

我将尝试以自定义数据类型(此处:Foo案例类)的示例回答您的问题,而不是特别是Avro,但我希望它无论如何都会帮助您.我们的想法是使用Kryo序列化来序列化/反序列化您的自定义类型,请参阅Spark文档中的Tuning: Data serialization.

Note: Spark supports serialization of case classes out of the box via built-in (implicit) encoders that you can import via import spark.implicits._. But let’s ignore this functionality for the sake of this example.

想象一下,您已将以下Foo案例类定义为您的自定义类型(TL; DR提示:为了防止遇到奇怪的Spark序列化投诉/错误,您应该将代码放入单独的Foo.scala文件中):

// This could also be your auto-generated Avro class/type
case class Foo(s: String)

现在,您有了以下结构化流代码来从Kafka读取数据,其中输入主题包含消息值为二进制编码字符串的Kafka消息,您的目标是根据这些消息值创建Foo实例(即类似于您将二进制数据反序列化为Avro类的实例):

val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","broker1:9092,broker2:9092")
    .option("subscribe","my-input-topic")
    .load()

现在我们将deserializing值作为我们自定义Foo类型的实例,我们首先需要定义一个隐式编码器[Foo]:

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))

回到你的Avro问题,你需要做的是:

>根据需要创建合适的编码器.
>将Foo(new String(row.getAs [Array [Byte]](“value”))替换为将二进制编码的Avro数据反序列化为Avro POJO的代码,即将二进制编码的Avro数据从消息值(row.getAs [Array [Byte]](“value”))并返回,例如,Avro GenericRecord或您在别处定义的任何SpecificCustomAvroObject.

如果其他人知道更简洁/更好/ ……回答Tal的问题的方式,我全都听见了.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读