加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang驱动kafka

发布时间:2020-12-16 18:23:56 所属栏目:大数据 来源:网络整理
导读:kafka简介 kafka是一种高吞吐量的分布式发布订阅消息系统, 特点 通过O(1)的磁盘数据结构提供消息的持久,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量,即使是非常普通的硬件,kafka也可以支持每秒数百万的消息. 支持通过kafka服务器

kafka简介

kafka是一种高吞吐量的分布式发布订阅消息系统,

特点

  • 通过O(1)的磁盘数据结构提供消息的持久,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能.
  • 高吞吐量,即使是非常普通的硬件,kafka也可以支持每秒数百万的消息.
  • 支持通过kafka服务器和消费机集群来分区消息
  • 支持hadoop并行数据加载.

kafka组成部分

  • Broker

kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic

每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic.(物理上不同Topic的消息分开存储. 逻辑上Topic的消息虽然保存于一个或多个broker上,但是用户只需要指定消息的Topic,即可生产或消费数据,而不同关心数据存于何处)

  • Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition

  • Producer

负责发布消息到Kafka broker

  • Consumer

消息消费者,向kafka broker读取消息的客户端

  • Consumer Group

每个Consumer属于一个特定的Consumer Group(可以每个consumer指定group name,若不指定group name,则属于默认的group)

golang驱动kafka

go get github.com/Shopify/sarama
  • 这个是github.com上开源的一个kafka驱动包.在下载这个包的过程中,会附带的下载几个依赖包.

  • 示例代码:

package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "sync"

    "github.com/Shopify/sarama"
)

var (
    // kafka 服务器地址,以及端口号,这里可以指定多个地址,使用逗号分隔开即可.
    kafka = "172.168.173.55:9092"
    wg     sync.WaitGroup
    logger = log.New(os.Stderr,"[srama]",log.LstdFlags)
)

func main() {

    sarama.Logger = logger
    // 连接kafka消息服务器
    consumer,err := sarama.NewConsumer(strings.Split(kafka,","),nil)
    if err != nil {
        logger.Println("Failed to start consumer: %s",err)
    }

    // consumer.Partitions 用户获取Topic上所有的Partitions. 消息服务器上已经创建了test这个topic,所以,在这里指定参数为test.
    partitionList,err := consumer.Partitions("test")
    if err != nil {
        logger.Println("Failed to get the list of partitions: ",err)
    }

    for partition := range partitionList {
        pc,err := consumer.ConsumePartition("test",int32(partition),sarama.OffsetNewest)
        if err != nil {
            logger.Printf("Failed to start consumer for partition %d: %sn",partition,err)
        }
        defer pc.AsyncClose()

        wg.Add(1)

        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            for msg := range pc.Messages() {
                fmt.Println("message is :",msg)
                fmt.Printf("Partition:%d,Offset:%d,Key:%s,Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
                fmt.Println()
            }
        }(pc)
    }

    wg.Wait()

    logger.Println("Done consuming topic hello")
    consumer.Close()
}

总结

kafka是一个非常优秀的消息订阅发布系统. 在大型项目中,通过消息系统,有效的解耦各个系统,使各个系统信息方便的互联.

参考资料

  1. 百度百科
  2. 其他网络上的文章

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读