加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala操作Kakfa API

发布时间:2020-12-16 09:04:03 所属栏目:安全 来源:网络整理
导读:如需大数据开发整套视频(hadoophivehbaseflumesqoopkafkazookeeperprestospark):请联系QQ:1974983704 由于我使用的是kafka_2.10-0.10.0.1,需要下载对应版本的kafka-clients-0.10.1.1.jar包 生产数据KafkaProducerEx: 1 package test.KafkaTest

如需大数据开发整套视频(hadoophivehbaseflumesqoopkafkazookeeperprestospark):请联系QQ:1974983704

由于我使用的是kafka_2.10-0.10.0.1,需要下载对应版本的kafka-clients-0.10.1.1.jar包

生产数据KafkaProducerEx:

 1 package test.KafkaTest
 2 
 3 import java.util.Properties
 4 import org.apache.kafka.clients.producer.{KafkaProducer,ProducerConfig,ProducerRecord}
 5 import org.apache.kafka.clients.consumer
 6 import org.apache.kafka.clients.consumer.ConsumerConfig
 7 import org.apache.kafka.common.serialization
 8 
13 object KafkaProducerEx {
14   def main(args:Array[String]): Unit= {
15     val topic = "test"
16     val brokers = "localhost:9092"  //Zookeeper地址,两个地址以逗号(,)分割
17     val props = new Properties()
18     props.put("bootstrap.servers",brokers)
19     props.put("acks","all")
20     props.put("retries","0")
21     props.put("batch.size","16384")
22     props.put("linger.ms","1")
23     props.put(ConsumerConfig.GROUP_ID_CONFIG,"test")
24     props.put("buffer.memory","33554432")
25     props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
26     props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
27 
28     val producer = new KafkaProducer[String,String](props)
29     val t = System.currentTimeMillis()
30     val msg = "producer message::::::"
31     var i=0
32     for(i<-Range(1,1000))
33       {
34         println(msg+i.toString())
35         val record = new ProducerRecord[String,String](topic,"kafka_key",msg+i.toString())
36         producer.send(record)
37       }
38     producer.close()
39   }
40 }

消费数据KafkaConsumerEx:

 1 package test.KafkaTest
 2 import java.util.{Collections,Properties}
 3 
 4 import scala.collection.JavaConversions._
 5 import org.apache.kafka.clients.consumer.{ConsumerConfig,ConsumerRecord,ConsumerRecords,KafkaConsumer}
 6 import org.apache.kafka.common.serialization
 7 
 8 object KafkaConsumerEx {
 9 
10   def main(args:Array[String]):Unit={
11     val topic = "test"
12     val brokers = "localhost:9092"
13     val props = new Properties()
14     props.put("bootstrap.servers",brokers)
15     props.put("enable.auto.commit","true")
16     props.put("auto.commit.interval.ms","10000")
17     props.put(ConsumerConfig.GROUP_ID_CONFIG,"test")
18     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
19     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
20     val consumer = new KafkaConsumer[String,String](props)
21     consumer.subscribe(Collections.singleton(topic))
22     while (true)
23       {
24         val records:ConsumerRecords[String,String] = consumer.poll(100)
25         for ( record <- records){
26           println(record.value())
27         }
28       }
29     consumer.close()
30   }
31 }

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读