NSQ的golang客户端简单使用
NSQ的golang客户端简单使用NSQ 是由国外的一个短链服务商bitly使用golang开发的一个消息队列系统,正好使用到了这个东西,在这里简单的记录下。 获取客户端nsq的golang客户端是官方版本的 go get github.com/nsqio/go-nsq 即可 简单的消费者和生产者使用该客户端有原始的command函数用于一些基础操作,也有consumer和producer的封装,我这里是直接使用了封装了。
消费者比较简单,只要监听队列消息,并处理就可以了,下面是一个简单的例子。 type NSQHandler struct { } func (this *NSQHandler) HandleMessage(message *nsq.Message) error { log.Println("recv:",string(message.Body)) return nil } func testNSQ() { waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() consumer,err := nsq.NewConsumer("test","ch1",nsq.NewConfig()) if nil != err { log.Println(err) return } consumer.AddHandler(&NSQHandler{}) err = consumer.ConnectToNSQD("10.100.156.207:4150") if nil != err { log.Println(err) return } select {} }() waiter.Wait() } 创建好consumer后,只需要自己创建一个struct并实现HandleMessage方法即可,当有消息时候,再去处理消息。 需要注意的是,AddHandler的回调是在别的routine中执行的,并且可以添加多个handler用于处理消息,这里可能需要注意下线程的同步问题。
生产者也和消费者差不多,首先需要创建一个producer func (this *MsgQueue) Init(addr string) error { var err error this.addr = addr // try to connect cfg := nsq.NewConfig() this.producer,err = nsq.NewProducer(addr,cfg) if nil != err { return err } // try to ping err = this.producer.Ping() if nil != err { this.producer.Stop() this.producer = nil return err } return nil } producer封装了较多的方法,分为同步和异步两种。带Async后缀的,都是异步的。 同步是收到了nsq的回应后再返回的函数,所以可能会堵塞,而异步的操作,则调用方需要传入一个chan用于接收结果,当有结果返回或者是超时的情况下,相应的内容会写到该chan中。 在这里我用了同步的api,毕竟消息队列假如出了什么问题,那么整个服务就不可用了,而且同步改异步也不会太麻烦,以后可以做下修改。 publish的方法也很简单,提供一个topic和数据就行了。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |