scala – 如何从Spark Streaming写入Kafka
发布时间:2020-12-16 09:33:13 所属栏目:安全 来源:网络整理
导读:我正在使用Spark Streaming来处理两个Kafka队列之间的数据,但是似乎找不到一个很好的方法来从Spark写入Kafka。我试过这个: input.foreachRDD(rdd = rdd.foreachPartition(partition = partition.foreach{ case x:String={ val props = new HashMap[String,
我正在使用Spark Streaming来处理两个Kafka队列之间的数据,但是似乎找不到一个很好的方法来从Spark写入Kafka。我试过这个:
input.foreachRDD(rdd => rdd.foreachPartition(partition => partition.foreach{ case x:String=>{ val props = new HashMap[String,Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String,String](props) val message=new ProducerRecord[String,String]("output",null,x) producer.send(message) } } ) ) 它的工作原理,但是为每个消息实例化一个新的KafkaProducer在实际情况下显然是不可行的,我正在努力解决。 KafkaProducer不可序列化,显然。 我想保留对每个进程的单个实例的引用,并在需要发送消息时访问它。我怎样才能做到这一点? 解决方法
我的第一个建议是尝试在foreachPartition中创建一个新的实例,并测量是否足够快,以满足您的需求(实例化foreachPartition中的重物是官方文档所提示的)。
另一个选项是使用如下例所示的对象池: https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala 然而,我发现使用检查点时很难实现。 对我来说,另一个对我来说很好的版本是一个工厂,如下面的博客文章所述,你只需要检查它是否提供足够的并行性来满足你的需求(查看评论部分): http://allegro.tech/2015/08/spark-kafka-integration.html (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |