加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

Golang1.7闲来无事写了一个基于Gob的tcp通讯用的包

发布时间:2020-12-16 18:27:43 所属栏目:大数据 来源:网络整理
导读:package gobconn import ( "encoding/gob" "errors" "net" "reflect" "sync" "unsafe" ) type message struct { Type string value reflect.Value} func (self *message) Recovery() { putPointer(self.value) putMsg(self)} func (self *message) Interface
package gobconn

import (
    "encoding/gob"
    "errors"
    "net"
    "reflect"
    "sync"
    "unsafe"
)

type message struct {
    Type  string
    value reflect.Value
}

func (self *message) Recovery() {
    putPointer(self.value)
    putMsg(self)
}

func (self *message) Interface() interface{} {
    return self.value.Interface()
}

/* 声明一个消息池用来重用对象 */

var msgPool sync.Pool

func getMsg() *message {
    if msg,ok := msgPool.Get().(*message); ok {
        return msg
    }
    return new(message)
}

func putMsg(msg *message) {
    msgPool.Put(msg)
}

type gobConnection struct {
    rwc   net.Conn
    enc   *gob.Encoder
    dec   *gob.Decoder
    rlock sync.Mutex
    wlock sync.Mutex
}

type GobConnection interface {
    Read() (msg *message,err error)
    Write(msg interface{}) (err error)
    Close() error
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
}

var gobPool sync.Pool

func NewGobConnection(conn net.Conn) GobConnection {
    if gcn,ok := gobPool.Get().(*gobConnection); ok {
        gcn.rwc = conn
        gcn.enc = gob.NewEncoder(conn)
        gcn.dec = gob.NewDecoder(conn)
        return gcn
    }
    return &gobConnection{rwc: conn,enc: gob.NewEncoder(conn),dec: gob.NewDecoder(conn)}
}

type msgStruct struct {
    StructName string
}

var (
    rheadMsg = msgStruct{}
    wheadMsg = msgStruct{}
)

func (self *gobConnection) Read() (msg *message,err error) {
    self.rlock.Lock()
    defer self.rlock.Unlock()

    err = self.dec.Decode(&rheadMsg)
    if err != nil {
        return
    }
    var typ reflect.Type
    typ,err = GetMsgType(rheadMsg.StructName)
    if err != nil {
        return
    }
    msg = getMsg()
    msg.Type = rheadMsg.StructName
    var value = getPointer(typ)
    err = self.dec.DecodeValue(value)
    if err != nil {
        msg.Recovery()
        return
    }
    msg.value = value
    return
}

func (self *gobConnection) Write(msg interface{}) (err error) {
    self.wlock.Lock()
    value := reflect.ValueOf(msg)
    if value.Kind() == reflect.Interface || value.Kind() == reflect.Ptr {
        wheadMsg.StructName = value.Elem().Type().String()
    } else {
        wheadMsg.StructName = value.Type().String()
    }
    err = self.enc.Encode(wheadMsg)
    if err != nil {
        self.wlock.Unlock()
        return
    }
    err = self.enc.EncodeValue(value)
    self.wlock.Unlock()
    return
}

func (self *gobConnection) Close() error {
    self.enc = nil
    self.dec = nil
    err := self.rwc.Close()
    gobPool.Put(self)
    return err
}

func (self *gobConnection) LocalAddr() net.Addr {
    return self.rwc.LocalAddr()
}

func (self *gobConnection) RemoteAddr() net.Addr {
    return self.rwc.RemoteAddr()
}

/* 通过指定类型申请一个定长的内存. */

var (
    lock   sync.Mutex
    ptrMap = make(map[string]*sync.Pool)
)

func getPointer(typ reflect.Type) reflect.Value {
    p,ok := ptrMap[typ.String()]
    if ok {
        if value,ok := p.Get().(reflect.Value); ok {
            return value
        }
        return reflect.New(typ)
    }
    lock.Lock()
    ptrMap[typ.String()] = new(sync.Pool)
    lock.Unlock()
    return reflect.New(typ)
}

func putPointer(value reflect.Value) {
    elem := value.Elem().Type()
    p,ok := ptrMap[elem.String()]
    if !ok {
        lock.Lock()
        p = new(sync.Pool)
        ptrMap[elem.String()] = p
        lock.Unlock()
    }
    ClearData(elem.Size(),unsafe.Pointer(value.Pointer()))
    p.Put(value)
}

/* 使用此包进行数据发送之前必须将类型注册.否则接收放无法解包 */

var (
    typeMap   = make(map[string]reflect.Type)
    Errortype = errors.New("type not register")
)

func GetMsgType(name string) (reflect.Type,error) {
    typ,ok := typeMap[name]
    if ok {
        return typ,nil
    }
    return nil,Errortype
}

func GetMsgAllType() []string {
    list := make([]string, 0,len(typeMap))
    for name,_ := range typeMap {
        list = append(list,name)
    }
    return list
}

func RegisterType(typ reflect.Type) {
    typeMap[typ.String()] = typ
}

func DeleteType(name string) {
    delete(typeMap,name)
}

/* 清除固定长度的内存数据,使用方法是:指定内存开始地址和长度. 请勿随便使用.使用不当可能会清除有效数据 */

func ClearData(size uintptr,ptr unsafe.Pointer) {
    var temptr uintptr = uintptr(ptr)
    var step uintptr = 1
    for {
        if size <= 0 {
            break
        }
        switch {
        case 1 <= size && size < 8:
            step = 1
        case 8 <= size && size < 32:
            step = 8
        case 32 <= size && size < 64:
            step = 32
        case size >= 64:
            step = 64
        }
        clearData(step,unsafe.Pointer(temptr))
        temptr += step
        size -= step
    }
}

func clearData(size uintptr,ptr unsafe.Pointer) {
    switch size {
    case 1:
        *(*[1]byte)(ptr) = [1]byte{}
    case 8:
        *(*[8]byte)(ptr) = [8]byte{}
    case 32:
        *(*[32]byte)(ptr) = [32]byte{}
    case 64:
        *(*[64]byte)(ptr) = [64]byte{}
    }
}

下面是使用小例子:

package main

import (
    "fmt"
    "gobconn"
    "net"
    "reflect"
    "time"
)

type Info struct {
    Name string
    Age  int
    Job  string
    Hob  []string
}

type Test struct {
    Date    int
    Login   string
    Path    string
    Servers float64
    List    []string
    Dir     string
    Stream  bool
}
//初始化要发送的类型
func init() {
    go InitListen("tcp",":2789")
    time.Sleep(1e9)
    gobconn.RegisterType(reflect.TypeOf(Info{}))
    gobconn.RegisterType(reflect.TypeOf(Test{}))
}
func main() {
    Test_rw()
    now := time.Now().Unix()
    Benchmark_rw()
    fmt.Println(time.Now().Unix())
    fmt.Println(now)
}
func Test_rw() {
    Dail("tcp","127.0.0.1:2789", 1)
}

func Benchmark_rw() {
    Dail("tcp", 10000)
}
//创建tcp监听的端口
func InitListen(proto,addr string) {
    lis,err := net.Listen(proto,addr)
    if err != nil {
        fmt.Println("listen error,",err.Error())
        return
    }
    defer lis.Close()
    for {
        conn,err := lis.Accept()
        if err != nil {
            fmt.Println("接入错误:",err)
            continue
        }
        go handle(conn)
    }
}
//链接处理逻辑
func handle(conn net.Conn) {
    con := gobconn.NewGobConnection(conn)
    defer con.Close()
    for {
        msg,err := con.Read()
        if err != nil {
            fmt.Println(con.RemoteAddr())
            fmt.Println("服务端ReadError:",err)
            return
        }
        err = con.Write(msg.Interface())
        if err != nil {
            fmt.Println("服务端WriteError:",err)
            msg.Recovery()
            return
        }
        msg.Recovery()
    }
}
//创建连接.
func Dail(proto,addr string,count int) {
    con,err := net.Dial(proto,addr)
    if err != nil {
        fmt.Println("客户端连接错误:",err)
        return
    }
    conn := gobconn.NewGobConnection(con)
    defer conn.Close()

    for i := 0; i < count; i++ {
        err = conn.Write(Info{"testing", 25,"IT",[]string{"backetball","football"}})
        if err != nil {
            fmt.Println("客户端WriteError:",err)
            return
        }
        msg,err := conn.Read()
        if err != nil {
            fmt.Println("客户端ReadError:",err)
            return
        }
        fmt.Println(msg,msg.Interface())
        msg.Recovery()
    }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读