nsq源码学习
nsq源码学习简介nsq 是用go语言实现的分布式队列。阅读源码对go语言的chanel,分布式有着更好的理解 代码结构核心代码分位3部分:
nsqd官方的介绍为
大意为:nsqd是接收,分发队列信息的守护进程。一般集群化运行,也可以独自部署。 下面对nsqd的2个逻辑做一次学习
启动逻辑在Makefile中,写到 $(BLDDIR)/nsqd: $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go internal/*/*.go) 可以找到nsqd的代码入口在
这个文件作为程序入口,主要做了几件事情:
首先作者使用svc包来控制程序的启动: type program struct { nsqd *nsqd.NSQD } func main() { prg := &program{} if err := svc.Run(prg,syscall.SIGINT,syscall.SIGTERM); err != nil { log.Fatal(err) } } func (p *program) Init(env svc.Environment) error {...} func (p *program) Start() error {...} func (p *program) Stop() error {...} 使用svc 能更简洁的保证程序干净的退出。在nsqd中,退出信号有两个:SIGINT(输入任意健) 和 SIGTERM(kill)。 Start()函数是主要逻辑的入口,在函数中引用了NewOptions(),它会创建一个默认的Options 结构。Options 后续会作为nsqd启动的参数来源 opts := nsqd.NewOptions() 作者通过flag包实现了命令行参数接收,如果命令行中执行配置文件,会同时读取配置文件。根据配置文件,命令行参数,来创建一个nsqd结构 options.Resolve(opts,flagSet,cfg) nsqd := nsqd.New(opts) 接下来会加载数据 err := nsqd.LoadMetadata() err = nsqd.PersistMetadata() LoadMetadata()过程为:
PersistMetadata()过程为:
接下来调用启动nsqd的主逻辑nsqd.Main(),主要完成以下过程
n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener,httpServer,"HTTP",n.logf) }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } 这里使用到了waitGroup,它是一个groutines 的控制包,能上线类似python 的join()功能。可以实现所有groutines都执行完再退出。 作者封装了waitGroup库 type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } Add() 会计数器加1,Done()使得计数器减一。此外WaitGroup提供Wait()函数:当计数器归0时,继续执行,否则阻塞。等待线程执行完再退出的作用。 此外,将函数作为参数,再在内部groutines执行,和python的装饰器的用法类似。 回到Main()函数中,启动http_api利用到了 queueScanLoop() 是管道扫进程,他的逻辑是将tpic,channel中的数据读入到worker channel,并每隔一定的时间更新worker数量,扫描chanel中的数据。 select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels),workCh,responseCh,closeCh) continue case <-n.exitChan: goto exit } 这里使用select来监听io操作,每隔扫描间隔时,判断channel中的是否存在数据需要处理,如果没有,则略过本次扫描。 每隔刷新间隔判断worker数量是否发生变化。 loop: numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } 这里还有dirty比率的概念,channel中有数据就认为是dirty,当该比率超过配置中的值时,则继续处理调用worker来处理,而不是等待固定间隔才进行扫描。 启动lookupLoop()和statsdLoop();这两个函数的作用初步看和nsqdlookup通信用,细节还未了解。 上面阐述了nsqd的启动逻辑。nsqd使用http api和用户交互 数据存储在api文档中,看到pub接口用来发布信息: 使用示例 在nsqd/http.go中,定义了路由规则 func newHTTPServer(ctx *context,tlsEnabled bool,tlsRequired bool) *httpServer { ... s := &httpServer{ ctx: ctx,tlsEnabled: tlsEnabled,tlsRequired: tlsRequired,router: router,} router.Handle("POST","/pub",http_api.Decorate(s.doPUB,http_api.V1)) ... } 在doPUB()函数中,可以看到数据存储时,最终调用了opic.PutMessage(msg) err = topic.PutMessage(msg) func (t *Topic) PutMessage(m *Message) error { t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } err := t.put(m) if err != nil { return err } atomic.AddUint64(&t.messageCount,1) return nil } PutMessage的逻辑是做并发控制(加锁)后,调Topic.put(*Message) 来写入信息。 这里有两个锁控制机制:
go语言中,sync包有两种锁,分别是互斥锁sync.Mutex和读写锁sync.RWMutex。 type Mutex func (m *Mutex) Lock() func (m *Mutex) Unlock() type RWMutex func (rw *RWMutex) Lock() func (rw *RWMutex) RLock() func (rw *RWMutex) RLocker() Locker func (rw *RWMutex) RUnlock() func (rw *RWMutex) Unlock() 互斥锁倾向于在全局使用,一旦加锁,就必须解锁之后才能访问。不二次加锁、二次解锁都会报错。 读写锁用在读远多于写的场景。 Lock()表示写加锁,加写锁前,如果已经存在写锁,或者其他读锁,会阻塞住,直到锁可用。已阻塞的 Lock 调用会从获得的锁中排除新的读取器,即写锁权限高于读锁,有写锁时优先进行写锁定。 RLock()表示读加锁,当有写锁时,无法加载读锁,当只有读锁或者没有锁时,可以加载读锁,读锁可以加载多个,所以适用于"读多写少"的场景。 关于读写锁的具体例子请参考golang中sync.RWMutex和sync.Mutex区别
atomic是sync包中的另一种锁机制,在实现上,它比互斥锁层级更低:互斥锁调用的是golang的api,而atomic是在内核层面实现。因此它比互斥锁效率更高,但是使用上也存在一定的限制。如果使用存储相关接口,存入的是nil,或者类型不对,会报错。 此外,在一些文章中,以及stack overflow中都提到尽量少用atomic,具体原因还不知道。 atomic有几种常见的函数:
具体查看atomic介绍 在上面的PutMessage逻辑中,增加topic读锁和topic中的部分值的原子操作锁后,调用了put()函数来实现写入。 func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b,m,t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR,"TOPIC(%s) ERROR: failed to write message to backend - %s",t.name,err) return err } } return nil } put函数的操作是,将Message写入channel,如果该topic的memoryMsgChan长度满了,则通过default逻辑,写入buffer中. buffer的实现是利用了sync.Pool包,相当于是一块缓存,在gc前释放,存储的长度受限于内存大小。 这里有两个问题:
经过查找,发现处理上述两个channel的函数是messagePump,而messagePump在创建一个新Topic时会被后台调用: func NewTopic(topicName string,ctx *context,deleteCallback func(*Topic)) *Topic { ... t.waitGroup.Wrap(func() {t.messagePump()}) ... } func (t *Topic) messagePump() { ... if len(chans) > 0 { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } select { case msg = <-memoryMsgChan: case buf = <-backendChan: msg,err = decodeMessage(buf) if err != nil { t.ctx.nsqd.logf(LOG_ERROR,"failed to decode message - %s",err) continue } ... } ... for i,channel := range chans { chanMsg := msg if i > 0 { chanMsg = NewMessage(msg.ID,msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } ... err := channel.PutMessage(chanMsg) ... } ... } 上述调用了channel的PutMessage()完成了message写入channel的memoryMsgChan中,写入逻辑和写入topic逻辑类似。到这里完成了数据的写入流程分析。 nsqlookup官方的介绍如下
大意为:nsqlookup是管理nsqd集群拓补信息的守护进程。nsqlookup用于
下面梳理一下nsqllookup的两个逻辑:
查询topic和channel根据查询数据的过程进行梳理,nsq提供了几个封装好的查询接口,如果nsq_tail、nsq_to_file 等。此处从nsq_til 举例查看。 nsq_tail中主要逻辑如下: consumers := []*nsq.Consumer{} for i := 0; i < len(topics); i += 1 { fmt.Printf("Adding consumer for topic: %sn",topics[i]) consumer,err := nsq.NewConsumer(topics[i],*channel,cfg) if err != nil { log.Fatal(err) } consumer.AddHandler(&TailHandler{topicName: topics[i],totalMessages: *totalMessages}) err = consumer.ConnectToNSQDs(nsqdTCPAddrs) if err != nil { log.Fatal(err) } err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs) if err != nil { log.Fatal(err) } consumers = append(consumers,consumer) } nsq_tail的逻辑是针对每个topic,分别初始化一个消费者consumer, 此处consumer实现的库是go-nsq。 之后从nsqd和nsqdlookup中获取数据,并调用handler处理。 在go-nsq/consumer.go中,ConnectToNSQLookupd()会调用queryLookupd()和lookupdLoop(),而lookupdLoop()又会定期调用queryLookupd()。代码如下: func (r *Consumer) ConnectToNSQLookupd(addr string) error { ... if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } ... } func (r *Consumer) lookupdLoop() { ... for { select { case <-ticker.C: r.queryLookupd() case <-r.lookupdRecheckChan: r.queryLookupd() case <-r.exitChan: goto exit } } ... } // make an HTTP req to one of the configured nsqlookupd instances to discover // which nsqd's provide the topic we are consuming. // // initiate a connection to any new producers that are identified. func (r *Consumer) queryLookupd() { ... var nsqdAddrs []string for _,producer := range data.Producers { broadcastAddress := producer.BroadcastAddress port := producer.TCPPort joined := net.JoinHostPort(broadcastAddress,strconv.Itoa(port)) nsqdAddrs = append(nsqdAddrs,joined) } // apply filter if discoveryFilter,ok := r.behaviorDelegate.(DiscoveryFilter); ok { nsqdAddrs = discoveryFilter.Filter(nsqdAddrs) } for _,addr := range nsqdAddrs { err = r.ConnectToNSQD(addr) if err != nil && err != ErrAlreadyConnected { r.log(LogLevelError,"(%s) error connecting to nsqd - %s",addr,err) continue } } } 在queryLookupd()中,获取到生产者信息后,调用ConnectToNSQD()连接每个nsqd server。用ConnectToNSQD()实现了读取message。 ConnectYpNSQD()调用了connection结果的函数readLoop()。 func (c *Conn) readLoop() { for { ... frameType,data,err := ReadUnpackedResponse(c) ... switch frameType { case FrameTypeResponse: c.delegate.OnResponse(c,data) case FrameTypeMessage: msg,err := DecodeMessage(data) if err != nil { c.log(LogLevelError,"IO error - %s",err) c.delegate.OnIOError(c,err) goto exit } msg.Delegate = delegate msg.NSQDAddress = c.String() atomic.AddInt64(&c.rdyCount,-1) atomic.AddInt64(&c.messagesInFlight,1) atomic.StoreInt64(&c.lastMsgTimestamp,time.Now().UnixNano()) c.delegate.OnMessage(c,msg) ... } } 在c.delegate.OnMessage(c,msg)中,会将message写入Consumer.incomingMessages。完成数据读取。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |