golang驱动kafka
kafka简介kafka是一种高吞吐量的分布式发布订阅消息系统, 特点
kafka组成部分
kafka集群包含一个或多个服务器,这种服务器被称为broker
每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic.(物理上不同Topic的消息分开存储. 逻辑上Topic的消息虽然保存于一个或多个broker上,但是用户只需要指定消息的Topic,即可生产或消费数据,而不同关心数据存于何处)
Partition是物理上的概念,每个Topic包含一个或多个Partition
负责发布消息到Kafka broker
消息消费者,向kafka broker读取消息的客户端
每个Consumer属于一个特定的Consumer Group(可以每个consumer指定group name,若不指定group name,则属于默认的group) golang驱动kafkago get github.com/Shopify/sarama
package main
import (
"fmt"
"log"
"os"
"strings"
"sync"
"github.com/Shopify/sarama"
)
var (
// kafka 服务器地址,以及端口号,这里可以指定多个地址,使用逗号分隔开即可.
kafka = "172.168.173.55:9092"
wg sync.WaitGroup
logger = log.New(os.Stderr,"[srama]",log.LstdFlags)
)
func main() {
sarama.Logger = logger
// 连接kafka消息服务器
consumer,err := sarama.NewConsumer(strings.Split(kafka,","),nil)
if err != nil {
logger.Println("Failed to start consumer: %s",err)
}
// consumer.Partitions 用户获取Topic上所有的Partitions. 消息服务器上已经创建了test这个topic,所以,在这里指定参数为test.
partitionList,err := consumer.Partitions("test")
if err != nil {
logger.Println("Failed to get the list of partitions: ",err)
}
for partition := range partitionList {
pc,err := consumer.ConsumePartition("test",int32(partition),sarama.OffsetNewest)
if err != nil {
logger.Printf("Failed to start consumer for partition %d: %sn",partition,err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Println("message is :",msg)
fmt.Printf("Partition:%d,Offset:%d,Key:%s,Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
fmt.Println()
}
}(pc)
}
wg.Wait()
logger.Println("Done consuming topic hello")
consumer.Close()
}
总结kafka是一个非常优秀的消息订阅发布系统. 在大型项目中,通过消息系统,有效的解耦各个系统,使各个系统信息方便的互联. 参考资料
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |