加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

如何在Scala中实现Kafka Consumer

发布时间:2020-12-16 18:56:56 所属栏目:安全 来源:网络整理
导读:我正在尝试在 scala中实现kafka使用者.我已经看过一百万个关于如何用 Java做的教程,甚至一些( like this one)说这是scala,但它是用Java编写的. 有谁知道我在哪里可以找到如何在Scala中编写它的示例?我刚开始学习Scala所以也许链接的例子可以在Scala中使用,
我正在尝试在 scala中实现kafka使用者.我已经看过一百万个关于如何用 Java做的教程,甚至一些( like this one)说这是scala,但它是用Java编写的.

有谁知道我在哪里可以找到如何在Scala中编写它的示例?我刚开始学习Scala所以也许链接的例子可以在Scala中使用,即使它是用Java编写的,但我老实说我不知道??我现在正在做什么.我google的所有内容都只是将我与如何用Java联系起来.

解决方法

您在Java中看到大多数示例的原因是,从Java开始编写的新KafkaProducer 0.8.2.2.

假设您使用sbt作为构建系统,并假设您使用Kafka 0.8.2.2(您可以根据需要更改版本),您将需要:

libraryDependencies ++= {
  Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2","org.apache.kafka" % "kafka-clients" % "0.8.2.2",)
}

一个简单的例子应该让你开始:

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    properties.put("bootstrap.servers","localhost:9092")
    properties.put("group.id","consumer-tutorial")
    properties.put("key.deserializer",classOf[StringDeserializer])
    properties.put("value.deserializer",classOf[StringDeserializer])

    val kafkaConsumer = new KafkaConsumer[String,String](properties)
    kafkaConsumer.subscribe("firstTopic","secondTopic")

    while (true) {
      val results = kafkaConsumer.poll(2000).asScala
      for ((topic,data) <- results) {
        // Do stuff
      }
    }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读