scala – 使用模式将ConsumerRecord值转换为spark-kafka中的Data
我正在使用Spark 2.0.2,使用Kafka 0.11.0和
我试图在火花流中消费来自kafka的消息.以下是代码: val topics = "notes" val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "localhost:7092","schema.registry.url" -> "http://localhost:7070","group.id" -> "connect-cluster1","value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer","key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer" ) val topicSet: Set[String] = Set(topics) val stream = KafkaUtils.createDirectStream[String,String]( SparkStream.ssc,PreferConsistent,Subscribe[String,String](topicSet,kafkaParams) ) stream.foreachRDD ( rdd => { rdd.foreachPartition(iterator => { while (iterator.hasNext) { val next = iterator.next() println(next.value()) } }) }) 如果Kafka消息包含记录,则输出将为: {"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee","createdat": 1505312886984,"createdby": "karthik","notes": "testing20"} {"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee","createdat": 1505312890472,"notes": "testing21"} 因此,从consumerRecord的值可以看出,收到的消息是Avro解码的. val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070",1000) val m = sr.getLatestSchemaMetadata(topics + "-value") val schemaId = m.getId val schemaString = m.getSchema val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070",1000) val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) val parser = new Schema.Parser() val avroSchema = parser.parse(schemaString) println(avroSchema) 模式打印如下: { “类型”: “记录”,“名”: “注意事项”,“命名空间”: “DB”,“田”:[{ “Name”: “ID”,“类型”: “空”,“串“],” 默认 “:空},{” 名称 “:” createdat”,“类型”:[ “空”,{ “类型”: “长”,“connect.version”:1,“connect.name” : “org.apache.kafka.connect.data.Timestamp”,“logicalType”: “时间戳的毫秒时间”}],“默认”:空},{ “名称”: “createdby”,“类型”:[“空”,“字符串”],{ “名称”: “注释”,“默认”:空}],“connect.name” : “db.notes”} 任何人都可以帮助我了解如何从消费者记录的价值中获取数据框架吗?我已经查看了其他问题,例如Use schema to convert AVRO messages with Spark to DataFrame,Handling schema changes in running Spark Streaming application,但他们并没有在第一时间处理consumerRecord. 解决方法
您可以使用以下代码段:
stream是从kafka010的kafkaUtils api返回的消费者记录的DStream: stream.foreachRDD(rdd => if (!rdd.isEmpty()) { val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ val topicValueStrings = rdd.map(record => (record.value()).toString) val df = sqlContext.read.json(topicValueStrings) df.show() }) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |