Kafka简单客户端编程实例
今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。 一、创建配置类Config 这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS package com.lya.kafka; /** * 配置项 * @author liuyazhuang * */ public class Config { /** * 话题 */ public static final String TOPIC = "wordcount"; /** * 线程数 */ public static final Integer THREADS = 1; } 二、编程生产者类ProducerDemo 这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。 package com.lya.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 生产者实例 * @author liuyazhuang * */ public class ProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("zk.connect","192.168.209.121:2181"); props.put("metadata.broker.list","192.168.209.121:9092"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("zk.connectiontimeout.ms","15000"); ProducerConfig config = new ProducerConfig(props); Producer<String,String> producer = new Producer<String,String>(config); // 发送业务消息 // 读取文件 读取内存数据库 读socket端口 for (int i = 1; i <= 100; i++) { Thread.sleep(500); producer.send(new KeyedMessage<String,String>(Config.TOPIC,"this number ===>>> " + i)); } } } 三、编写消息者类ConsumerDemo 这个类的主要作用就是消费Kafka中wordcount话题的消息。 package com.lya.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 消费者实例 * @author liuyazhuang * */ public class ConsumerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect","192.168.209.121:2181"); props.put("group.id","1111"); props.put("auto.offset.reset","smallest"); props.put("zk.connectiontimeout.ms","15000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(Config.TOPIC,Config.THREADS); Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(Config.TOPIC); for(final KafkaStream<byte[],byte[]> kafkaStream : streams){ new Thread(new Runnable() { @Override public void run() { for(MessageAndMetadata<byte[],byte[]> mm : kafkaStream){ String msg = new String(mm.message()); System.out.println(msg); } } }).start(); } } } 四、运行实例 首先,运行消费者类ConsumerDemo 没有打印任何信息。 打印出了生产者生产的消息。 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |