加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

NSQ 源码阅读(二) NSQD

发布时间:2020-12-16 18:12:58 所属栏目:大数据 来源:网络整理
导读:未完成 前言 NSQD是 nsq 的主要逻辑部分,请参考官方文档。我们直接看代码。 入口函数 main 函数位于 github.com/nsqio/nsq/apps/nsqd/nsqd.go func main() { prg := program{} if err := svc.Run(prg,syscall.SIGINT,syscall.SIGTERM); err != nil { log.Fa

未完成

前言

NSQD是 nsq 的主要逻辑部分,请参考官方文档。我们直接看代码。

入口函数

main 函数位于

github.com/nsqio/nsq/apps/nsqd/nsqd.go

func main() {
    prg := &program{}
    if err := svc.Run(prg,syscall.SIGINT,syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

其中svc.Run 是一个service wrapper,来自于go-svc。它传入Service 接口,并且做了四件事情Init,Start,NotifySignal和 Stop:

// Run runs your Service.
//
// Run will block until one of the signals specified in sig is received.
// If sig is empty syscall.SIGINT and syscall.SIGTERM are used by default.
func Run(service Service,sig ...os.Signal) error {
    env := environment{}
    if err := service.Init(env); err != nil {
        return err
    }

    if err := service.Start(); err != nil {
        return err
    }

    if len(sig) == 0 {
        sig = []os.Signal{syscall.SIGINT,syscall.SIGTERM}
    }

    signalChan := make(chan os.Signal,1)
    signalNotify(signalChan,sig...)
    <-signalChan

    return service.Stop()
}

让我们来看下program 对于Service 这个接口的实现

func (p *program) Init(env svc.Environment) error {
    if env.IsWindowsService() {
        dir := filepath.Dir(os.Args[0])
        return os.Chdir(dir)
    }
    return nil
}
func (p *program) Start() error {
    //ztd: 初始化选项
    opts := nsqd.NewOptions()
    //ztd: 设置接受的参数
    flagSet := nsqdFlagSet(opts)
    flagSet.Parse(os.Args[1:])

    rand.Seed(time.Now().UTC().UnixNano())
    //ztd: 如果仅仅查询版本信息
    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
        fmt.Println(version.String("nsqd"))
        os.Exit(0)
    }
    //ztd: 如果指定了conf 文件
    var cfg config
    configFile := flagSet.Lookup("config").Value.String()
    if configFile != "" {
        _,err := toml.DecodeFile(configFile,&cfg)
        if err != nil {
            log.Fatalf("ERROR: failed to load config file %s - %s",configFile,err.Error())
        }
    }
    cfg.Validate()
    //ztd: 合并命令行和配置文件中的配置
    options.Resolve(opts,flagSet,cfg)
    nsqd := nsqd.New(opts)
    //ztd: metadata 中存储了topic和 channel 信息
    err := nsqd.LoadMetadata()
    if err != nil {
        log.Fatalf("ERROR: %s",err.Error())
    }
    //ztd: 不是很明白为什么读完了马上又写
    err = nsqd.PersistMetadata()
    if err != nil {
        log.Fatalf("ERROR: failed to persist metadata - %s",err.Error())
    }
    nsqd.Main()

    p.nsqd = nsqd
    return nil
}

func (p *program) Stop() error {
    if p.nsqd != nil {
        p.nsqd.Exit()
    }
    return nil
}

nsqd.Main()

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读