初识Apache Kafka+JAVA程序实例
本文是从英文的官网摘了翻译的,用作自己的整理和记录。水平有限,欢迎指正。版本是: kafka_2.10-0.10.0.0 1、基础概念
主题和日志 1个主题是命名或分类发布的消息。每个主题,Kafka持有1个分区日志,看起来像下面图片。 每个Partition都是有序的,固定长度的消息队列1直不断增加到–1个提交日志。消息在Partition内分配了顺序的id叫偏移量,这个偏移量在分区中唯1标识每一个消息的。 Kafka保存所有(1段时间内的-可配置)已发布的消息-不管它们是不是已被消费。例如,如果日志保存被设置为两天,那末在1个消息发布后,两天内它是可用的,两天后它将被抛弃到空闲空间 事实上,元数据保存在每一个消费进程中,是基于消费进程在日志中的位置,该位置称为“偏移量”(In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log,called the “offset”.)。这个偏移量被消费者控制:正常的消费者读取消息时,线性增加偏移量,但事实上消费者可以以任何它顺序的方式来控制。例如:1个消费者可以重置到之前的偏移量位置来重新处理。 Distribution(散布)日志的分辨别布在Kafka集群中的服务器上,每一个服务器处理数据,并要求分区内容的副本。为了容错,每一个分区的副本数量是可以通过服务器设置的。 Producers生产者将数据发布到他们所选择的主题。生产者负责选择那个消息分配到那个主题的哪一个partition。至于选择哪一个分区可以简单的循环方式到达负载均衡,也能够者根据语义功能来分区。 Consumers 每一个消费者把自己标示到1个消费组,当每一个消息发布到主题后,消息在投递到每一个定阅消费组1个消费实例。消费者实例可以在不同的进程或不同的机器上。 Kafka只在1个分区中的消息提供了1个总的顺序,而不是在1个主题中的不同分区之间的。但是,如果您需要1个完全有序的消息,这可以通过1个主题和1个分区来实现,明显这将意味着每个消费组只有1个消费进程。 Guarantees(保证)Kafka给出了以下保证:
2、程序实例重要的来了,上面看不懂的没关系,看程序,最直接。 其中GroupA有2个消费者,GroupB有4个消费者。 我们的生产者平均向4个分区写入了内容。例: package part;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record,the slowest but most durable setting.
//“所有”设置将致使记录的完全提交阻塞,最慢的,但最持久的设置。
props.put("acks","all");
//如果要求失败,生产者也会自动重试,即便设置成0 the producer can automatically retry.
props.put("retries",0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size",16384);
//默许立即发送,这里这是延时毫秒数
props.put("linger.ms",1);
//生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("buffer.memory",33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建kafka的生产者类
Producer<String,String> producer = new KafkaProducer<String,String>(props);
//生产者的主要方法
// close();//Close this producer.
// close(long timeout,TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有缓存记录被立刻发送
for(int i = 0; i < 100; i++)
//这里平均写入4个分区
producer.send(new ProducerRecord<String,String>("foo",i%4,Integer.toString(i),Integer.toString(i)));
producer.close();
}
}
消费者 package part;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class TestConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
System.out.println("this is the group part test 1");
//消费者的组id
props.put("group.id","GroupA");//这里是GroupA或GroupB
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
//从poll(拉)的回话处理时长
props.put("session.timeout.ms","30000");
//poll的数量限制
//props.put("max.poll.records","100");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
//定阅主题列表topic
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(100);
for (ConsumerRecord<String,String> record : records)
// 正常这里应当使用线程池处理,不应当在这里处理
System.out.printf("offset = %d,key = %s,value = %s",record.offset(),record.key(),record.value()+"n");
}
}
}
如果GroupA和GroupB都正常启动,那末GroupB内4各消费平均消费生产者的消息数据(这里每一个25个消息),GroupA内2个消费者各处理50各消息,每一个消费者处理2各分区。如果GroupA内1个消费者挂断,那末另外一个处理所有消息数据。如果GroupB挂掉1个,那末将有1个消费者出来处理挂掉没有处理的消息数据。 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic foo --partitions 4 3、multi-broker cluster这里其实和Zookeeper机制由点类似,也是建立了1个leader和几个follower。主要的作用还是为了可扩大性和容错性。当集中任意1台出问题,都可以保证系统的正确和稳定。即便是leader出现问题,它们也能够通过投票的方式产生新leader. 这里只是简单说明1下。 在它的官方例子中通过复制原本的配置文件,在本地建立了伪集群服务。 > cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2 其中 broker.id 属性是集群中唯1的和永久的节点名字,正常应当是1台机子1个服务。其它两个是由于伪集群的缘由必须修改。 > bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties & 4、典型利用场景
英文原地址:http://kafka.apache.org/documentation.html#quickstart (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |