Kafka利用Java实现数据的生产和消费实例教程
前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka 。不过在使用kafka的时候,还是应该简单的了解下kafka。 Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 Kafka 有如下特性:
kafka的术语
kafka核心Api kafka有四个核心API
示例图如下: kafka 应用场景
以上介绍参考kafka官方文档。 开发准备 如果我们要开发一个kafka的程序,应该做些什么呢? 首先,在搭建好kafka环境之后,我们要考虑的是我们是生产者还是消费者,也就是消息的发送者还是接受者。 在大致的了解kafka之后,我们来开发第一个程序。 这里用的开发语言是Java,构建工具Maven。 Maven的依赖如下: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> Kafka Producer 在开发生产的时候,先简单的介绍下kafka各种配置说明:
... 还有更多配置,可以去查看官方文档,这里就不在说明了。 那么我们kafka 的producer配置如下: Properties props = new Properties(); props.put("bootstrap.servers","master:9092,slave1:9092,slave2:9092"); props.put("acks","all"); props.put("retries",0); props.put("batch.size",16384); props.put("key.serializer",StringSerializer.class.getName()); props.put("value.serializer",StringSerializer.class.getName()); KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props); kafka的配置添加之后,我们便开始生产数据,生产数据代码只需如下就行: producer.send(new ProducerRecord<String,String>(topic,key,value));
在写好生产者程序之后,那我们先来生产吧! 我这里发送的消息为: String messageStr="你好,这是第"+messageNo+"条数据"; 并且只发送1000条就退出,结果如下: 可以看到信息成功的打印了。 如果不想用程序进行验证程序是否发送成功,以及消息发送的准确性,可以在kafka服务器上使用命令查看。 Kafka Consumer kafka消费这块应该来说是重点,毕竟大部分的时候,我们主要使用的是将数据进行消费。 kafka消费的配置如下:
那么我们kafka 的consumer配置如下: Properties props = new Properties(); props.put("bootstrap.servers",slave2:9092"); props.put("group.id",GROUPID); props.put("enable.auto.commit","true"); props.put("auto.commit.interval.ms","1000"); props.put("session.timeout.ms","30000"); props.put("max.poll.records",1000); props.put("auto.offset.reset","earliest"); props.put("key.deserializer",StringDeserializer.class.getName()); props.put("value.deserializer",StringDeserializer.class.getName()); KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); 由于我这是设置的自动提交,所以消费代码如下: 我们需要先订阅一个topic,也就是指定消费哪一个topic。 consumer.subscribe(Arrays.asList(topic)); 订阅之后,我们再从kafka中拉取数据: ConsumerRecords<String,String> msgList=consumer.poll(1000); 一般来说进行消费会使用监听,这里我们就用for(;;)来进行监听, 并且设置消费1000条就退出! 结果如下: 可以看到我们这里已经成功消费了生产的数据了。 代码 那么生产者和消费者的代码如下: 生产者: import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; /** * * Title: KafkaProducerTest * Description: * kafka 生产者demo * Version:1.0.0 * @author pancm * @date 2018年1月26日 */ public class KafkaProducerTest implements Runnable { private final KafkaProducer<String,String> producer; private final String topic; public KafkaProducerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers",StringSerializer.class.getName()); this.producer = new KafkaProducer<String,String>(props); this.topic = topicName; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr="你好,这是第"+messageNo+"条数据"; producer.send(new ProducerRecord<String,"Message",messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("发送的信息:" + messageStr); } //生产1000条就退出 if(messageNo%1000==0){ System.out.println("成功发送了"+messageNo+"条"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST"); Thread thread = new Thread(test); thread.start(); } } 消费者: import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; /** * * Title: KafkaConsumerTest * Description: * kafka消费者 demo * Version:1.0.0 * @author pancm * @date 2018年1月26日 */ public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String,String> consumer; private ConsumerRecords<String,String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers","30000"); props.put("auto.offset.reset",StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String,String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String,String> record : msgList) { //消费100条就打印,但打印的数据不一定是这个规律的 if(messageNo%100==0){ System.out.println(messageNo+"=======receive: key = " + record.key() + ",value = " + record.value()+" offset==="+record.offset()); } //当消费了1000条就退出 if(messageNo%1000==0){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST"); Thread thread1 = new Thread(test1); thread1.start(); } } 注: master、slave1、slave2 是因为我在自己的环境做了关系映射,这个可以换成服务器的IP。 当然项目我放在Github上了,有兴趣的可以看看。 https://github.com/xuwujing/kafka (本地下载) 总结 简单的开发一个kafka的程序需要以下步骤:
kafka介绍参考官方文档:http://kafka.apache.org/intro 总结 以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对编程小技巧的支持。 您可能感兴趣的文章:
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |