加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

简单的订阅发布机制实现(Golang)

发布时间:2020-12-16 18:44:30 所属栏目:大数据 来源:网络整理
导读:Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。 练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。 Server.go Server结构中的Dict用map保存了Channel的相关信息,而Chann

Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。

练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。


Server.go

Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的Client.

这个例子与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的Client列表。而Client中则保

存了其所有订阅的Channel信息。

package pubsub

import (
	"errors"
	"sync"
)

type Client struct {
	Id int
	Ip string
}

type Server struct {
	Dict map[string]*Channel //map[Channel.Name]*Channel
	sync.RWMutex
}

func NewServer() *Server {
	s := &Server{}
	s.Dict = make(map[string]*Channel) //所有channel
	return s
}

//订阅
func (srv *Server) Subscribe(client *Client,channelName string) {

	// 客户是否在Channel的客户列表中
	srv.RLock()
	ch,found := srv.Dict[channelName]
	srv.RUnlock()

	if !found {
		ch = NewChannel(channelName)
		ch.AddClient(client)
		srv.Lock()
		srv.Dict[channelName] = ch
		srv.Unlock()
	} else {
		ch.AddClient(client)
	}

}

//取消订阅
func (srv *Server) Unsubscribe(client *Client,channelName string) {
	srv.RLock()
	ch,found := srv.Dict[channelName]
	srv.RUnlock()
	if found {
		if ch.DeleteClient(client) == 0 {
			ch.Exit()
			srv.Lock()
			delete(srv.Dict,channelName)
			srv.Unlock()
		}
	}
}

//发布消息
func (srv *Server) PublishMessage(channelName,message string) (bool,error) {
	srv.RLock()
	ch,found := srv.Dict[channelName]
	if !found {
		srv.RUnlock()
		return false,errors.New("channelName不存在!")
	}
	srv.RUnlock()

	ch.Notify(message)
	ch.Wait()
	return true,nil
}

Channel.go

每个Channel 负责将信息放入WaitGroup,发送到Client或队列,例子中是打印一条信息。 当clients为空时,则exit().

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type Channel struct {
	Name    string
	clients map[int]*Client
	//  exitChan   chan int
	sync.RWMutex
	waitGroup    WaitGroupWrapper
	messageCount uint64
	exitFlag     int32
}

func NewChannel(channelName string) *Channel {
	return &Channel{
		Name: channelName,//  exitChan:       make(chan int),clients: make(map[int]*Client),}
}

func (ch *Channel) AddClient(client *Client) bool {
	ch.RLock()
	_,found := ch.clients[client.Id]
	ch.RUnlock()

	ch.Lock()
	if !found {
		ch.clients[client.Id] = client
	}
	ch.Unlock()
	return found
}

func (ch *Channel) DeleteClient(client *Client) int {
	var ret int
	ch.ReplyMsg(
		fmt.Sprintf("从channel:%s 中删除client:%d ",ch.Name,client.Id))

	ch.Lock()
	delete(ch.clients,client.Id)
	ch.Unlock()

	ch.RLock()
	ret = len(ch.clients)
	ch.RUnlock()

	return ret
}

func (ch *Channel) Notify(message string) bool {

	ch.RLock()
	defer ch.RUnlock()

	for cid,_ := range ch.clients {
		ch.ReplyMsg(
			fmt.Sprintf("channel:%s client:%d message:%s",cid,message))
	}
	return true
}

func (ch *Channel) ReplyMsg(message string) {
	ch.waitGroup.Wrap(func() { fmt.Println(message) })
}

func (ch *Channel) Wait() {
	ch.waitGroup.Wait()
}

func (ch *Channel) Exiting() bool {
	return atomic.LoadInt32(&ch.exitFlag) == 1
}

func (ch *Channel) Exit() {
	if !atomic.CompareAndSwapInt32(&ch.exitFlag,1) {
		return
	}
	//close(ch.exitChan)
	ch.Wait()
}

func (ch *Channel) PutMessage(clientID int,message string) {
	ch.RLock()
	defer ch.RUnlock()

	if ch.Exiting() {
		return
	}

	//select {
	// case <-t.exitChan:
	// return
	//}
	fmt.Println(ch.Name,":",message)

	atomic.AddUint64(&ch.messageCount,1)
	return
}


主程序:

//订阅/发布 练习
//author: Xiong Chuan Liang  
//date: 2015-3-17

package main

import (
  . "pubsub"
)

func main(){
  c1 := &Client{Id:100,Ip:"172.18.1.1"}
  c3:=  &Client{Id:300,Ip:"172.18.1.3"}

   srv := NewServer()
   srv.Subscribe(c1,"Topic")
   srv.Subscribe(c3,"Topic")

   srv.PublishMessage("Topic","测试信息1")

   srv.Unsubscribe(c3,"Topic")
   srv.PublishMessage("Topic","测试信息2222")

    srv.Subscribe(c1,"Topic2")
    srv.Subscribe(c3,"Topic2")
    srv.PublishMessage("Topic2"," Topic2的测试信息")   
}

/*
运行结果:
channel:Topic client:100 message:测试信息1
channel:Topic client:300 message:测试信息1
从channel:Topic 中删除client:300
channel:Topic client:100 message:测试信息2222
channel:Topic2 client:100 message: Topic2的测试信息
channel:Topic2 client:300 message: Topic2的测试信息

*/

没做太复杂的测试,粗略看好像没有问题。


MAIL: xcl_168@aliyun.com

BLOG: http://blog.csdn.net/xcl168

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读