加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用模式将ConsumerRecord值转换为spark-kafka中的Data

发布时间:2020-12-16 18:37:17 所属栏目:安全 来源:网络整理
导读:我正在使用Spark 2.0.2,使用Kafka 0.11.0和 我试图在火花流中消费来自kafka的消息.以下是代码: val topics = "notes"val kafkaParams = Map[String,Object]( "bootstrap.servers" - "localhost:7092","schema.registry.url" - "http://localhost:7070","gro
我正在使用Spark 2.0.2,使用Kafka 0.11.0和
我试图在火花流中消费来自kafka的消息.以下是代码:

val topics = "notes"
val kafkaParams = Map[String,Object](
  "bootstrap.servers" -> "localhost:7092","schema.registry.url" -> "http://localhost:7070","group.id" -> "connect-cluster1","value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer","key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String,String](
  SparkStream.ssc,PreferConsistent,Subscribe[String,String](topicSet,kafkaParams)
)
stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

如果Kafka消息包含记录,则输出将为:

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee","createdat": 1505312886984,"createdby": "karthik","notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee","createdat": 1505312890472,"notes": "testing21"}

因此,从consumerRecord的值可以看出,收到的消息是Avro解码的.
现在我需要数据帧格式的那些记录,但我不知道如何从这里开始,即使手头的模式如下:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070",1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070",1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)

模式打印如下:

{ “类型”: “记录”,“名”: “注意事项”,“命名空间”: “DB”,“田”:[{ “Name”: “ID”,“类型”: “空”,“串“],” 默认 “:空},{” 名称 “:” createdat”,“类型”:[ “空”,{ “类型”: “长”,“connect.version”:1,“connect.name” : “org.apache.kafka.connect.data.Timestamp”,“logicalType”: “时间戳的毫秒时间”}],“默认”:空},{ “名称”: “createdby”,“类型”:[“空”,“字符串”],{ “名称”: “注释”,“默认”:空}],“connect.name” : “db.notes”}

任何人都可以帮助我了解如何从消费者记录的价值中获取数据框架吗?我已经查看了其他问题,例如Use schema to convert AVRO messages with Spark to DataFrame,Handling schema changes in running Spark Streaming application,但他们并没有在第一时间处理consumerRecord.

解决方法

您可以使用以下代码段:
stream是从kafka010的kafkaUtils api返回的消费者记录的DStream:

stream.foreachRDD(rdd =>
    if (!rdd.isEmpty()) {
        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
        import sqlContext.implicits._
        val topicValueStrings = rdd.map(record => (record.value()).toString)
        val df = sqlContext.read.json(topicValueStrings)
        df.show()
    })

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读