Kafka使用Java客户端进行访问的示例代码
发布时间:2020-12-14 14:15:34 所属栏目:Java 来源:网络整理
导读:本文环境如下: 操作系统:CentOS 6 32位 JDK版本:1.8.0_77 32位 Kafka版本:0.9.0.1(Scala 2.11) 1. maven依赖包 dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version0.9.0.1/version/dependency 2. 生产者代码 packa
本文环境如下: 操作系统:CentOS 6 32位 JDK版本:1.8.0_77 32位 Kafka版本:0.9.0.1(Scala 2.11) 1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> 2. 生产者代码 package com.lnho.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","master:9092"); props.put("acks","all"); props.put("retries",0); props.put("batch.size",16384); props.put("linger.ms",1); props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("topic1",Integer.toString(i),Integer.toString(i))); producer.close(); } } 3. 消费者代码 package com.lnho.example.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","master:9092"); props.put("group.id","test"); props.put("enable.auto.commit","true"); props.put("auto.commit.interval.ms","1000"); props.put("session.timeout.ms","30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1")); while (true) { ConsumerRecords<String,String> records = consumer.poll(100); for (ConsumerRecord<String,String> record : records) System.out.printf("offset = %d,key = %s,value = %sn",record.offset(),record.key(),record.value()); } } } 4. 执行程序 lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar 生产者: 复制代码 代码如下: java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaProducerExample 消费者: 复制代码 代码如下: java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |