加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何从Spark Streaming写入Kafka

发布时间:2020-12-16 09:33:13 所属栏目:安全 来源:网络整理
导读:我正在使用Spark Streaming来处理两个Kafka队列之间的数据,但是似乎找不到一个很好的方法来从Spark写入Kafka。我试过这个: input.foreachRDD(rdd = rdd.foreachPartition(partition = partition.foreach{ case x:String={ val props = new HashMap[String,
我正在使用Spark Streaming来处理两个Kafka队列之间的数据,但是似乎找不到一个很好的方法来从Spark写入Kafka。我试过这个:

input.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case x:String=>{

                    val props = new HashMap[String,Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String,String]("output",null,x)
                    producer.send(message)
                  }
                }


      )
    )

它的工作原理,但是为每个消息实例化一个新的KafkaProducer在实际情况下显然是不可行的,我正在努力解决。

KafkaProducer不可序列化,显然。

我想保留对每个进程的单个实例的引用,并在需要发送消息时访问它。我怎样才能做到这一点?

解决方法

我的第一个建议是尝试在foreachPartition中创建一个新的实例,并测量是否足够快,以满足您的需求(实例化foreachPartition中的重物是官方文档所提示的)。

另一个选项是使用如下例所示的对象池:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

然而,我发现使用检查点时很难实现。

对我来说,另一个对我来说很好的版本是一个工厂,如下面的博客文章所述,你只需要检查它是否提供足够的并行性来满足你的需求(查看评论部分):

http://allegro.tech/2015/08/spark-kafka-integration.html

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读