加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

NSQ系列之nsqlookupd代码分析一(初探nsqlookup)

发布时间:2020-12-16 18:39:57 所属栏目:大数据 来源:网络整理
导读:NSQ系列之nsqlookupd代码分析一(初探nsqlookup) nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且提供 nsqd 节点广播话题( topic )和通道( channel )信息。 nsqlookupd 有两个接口: TCP

NSQ系列之nsqlookupd代码分析一(初探nsqlookup)

nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且提供 nsqd 节点广播话题(topic)和通道(channel)信息。

nsqlookupd 有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。

本系列的代码分析均是基于nsq v0.3.5的代码进行的分析,如有不对之处欢迎大家指正指导。

nsqlookup struct分析

代码文件路径为nsq/nsqlookupd/nsqlookupd.go

type NSQLookupd struct {
	sync.RWMutex   			//读写锁
	opts         *Options  //nsqlookupd 配置信息 定义文件路径为nsq/nsqlookupd/options.go
	tcpListener  net.Listener 
	httpListener net.Listener 
	waitGroup    util.WaitGroupWrapper //WaitGroup 典型应用 用于开启两个goroutine,一个监听HTTP 一个监听TCP
	DB           *RegistrationDB //product 注册数据库 具体分析后面章节再讲
}

//初始化NSQLookupd实例
func New(opts *Options) *NSQLookupd {
	n := &NSQLookupd{
		opts: opts,DB:   NewRegistrationDB(),//初始化DB实例
	}
	n.logf(version.String("nsqlookupd"))
	return n
}


func (l *NSQLookupd) Main() {
	ctx := &Context{l} //初始化Context实例将NSQLookupd指针放入Context实例中 Context结构请参考文件nsq/nsqlookupd/context.go Context用于nsqlookupd中的tcpServer 和 httpServer中

	tcpListener,err := net.Listen("tcp",l.opts.TCPAddress) //开启TCP监听
	if err != nil {
		l.logf("FATAL: listen (%s) failed - %s",l.opts.TCPAddress,err)
		os.Exit(1)
	}
	l.Lock()
	l.tcpListener = tcpListener
	l.Unlock()
	tcpServer := &tcpServer{ctx: ctx} //创建一个tcpServer tcpServer 实现了nsq/internal/protocol包中的TCPHandler接口
	l.waitGroup.Wrap(func() {
            //protocol.TCPServer方法的过程就是tcpListener accept tcp的连接
            //然后通过tcpServer中的Handle分析报文,然后处理相关的协议
		protocol.TCPServer(tcpListener,tcpServer,l.opts.Logger)
	}) //把tcpServer加入到waitGroup

	httpListener,l.opts.HTTPAddress) //开启HTTP监听
	if err != nil {
		l.logf("FATAL: listen (%s) failed - %s",l.opts.HTTPAddress,err)
		os.Exit(1)
	}
	l.Lock()
	l.httpListener = httpListener
	l.Unlock()
	httpServer := newHTTPServer(ctx) //创建一个httpServer
	l.waitGroup.Wrap(func() {
		http_api.Serve(httpListener,httpServer,"HTTP",l.opts.Logger)
	}) //把httpServer加入到waitGroup
}


//NSQLookupd退出
func (l *NSQLookupd) Exit() {
	if l.tcpListener != nil {
		l.tcpListener.Close() //关闭tcpListener
	}

	if l.httpListener != nil {
		l.httpListener.Close() //关闭httpListener
	}
	l.waitGroup.Wait()
}

这一章节的代码就先分析到这里了,下一章节要分析的是nsqlookup中的tcpServer.

第二章传送门 NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读