加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang -- TCP服务器(2)

发布时间:2020-12-16 18:57:03 所属栏目:大数据 来源:网络整理
导读:tcp服务器 包括日志,定时处理,广播,超时 map写添加了锁(读不用锁) 添加了解码器 删除了addr-buf映射,添加删除锁 mark:今天听大神所要处理系统中断EINTR,以后做简单处理EINTR--retry mark:用struct封装addr, net . Listener,exit(是否断开)等信息..最重要的是

tcp服务器
包括日志,定时处理,广播,超时
map写添加了锁(读不用锁)
添加了解码器
删除了addr-buf映射,添加删除锁
mark:今天听大神所要处理系统中断EINTR,以后做简单处理EINTR--retry

mark:用struct封装addr,net.Listener,exit(是否断开)等信息..最重要的是使用:

br := bufioNewReader(conn),bw :=bufio.NewWriter(conn)来取代读循环,这样就可以需要的时候再读/写

https://github.com/zhangpeihao/gortmp/blob/master/server.go

packagemain

import(
"bytes"
"encoding/binary"
"fmt"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
)

funcmain(){
tcpStart(8889)
}

/*
定义相关锁
*/
var(
connMkMutexsync.Mutex
connDelMutexsync.Mutex
)

/*
定义logger
*/
varlogger*log.Logger

/*
初始化log
*/
funcinitLog(logfile*os.File){
//logger=log.New(logfile,"log:",log.Ldate|log.Ltime)
logger=log.New(logfile,"prefix",0)
}

/*
处理log
*/
funcdoLog(args...interface{}){
str:=time.Now().Format("2006-01-0215:04:05")
varlogDatastring
vartempstring

for_,arg:=rangeargs{
switchval:=arg.(type){
caseint:
temp=strconv.Itoa(val)
casestring:
temp=val
}
iflen(temp)>64{//限制只打印前64个字符
logData=temp[:64]
}else{
logData=temp
}
str=str+""+logData
}
logger.Println(str)
}

/*
定义socketconn映射
*/
varclisConnMapmap[string]*net.TCPConn

/*
初始化socketconn映射
*/
funcinitClisConnMap(){
clisConnMap=make(map[string]*net.TCPConn)
}

/*
建立socketconn映射
*/
funcmkClisConn(keystring,conn*net.TCPConn){
connMkMutex.Lock()
deferconnMkMutex.Unlock()
clisConnMap[key]=conn
}

/*
删除socketconn映射
*/
funcdelClisConn(keystring){
connDelMutex.Lock()
deferconnDelMutex.Unlock()
delete(clisConnMap,key)
}

/*
定义解码器
*/
typeUnpackerstruct{
//头(xy)2bytes+标识1byte+包长度2bytes+data
//当然了,头不可能是xy,这里举例子,而且一般还需要转义
_buf[]byte
}

func(unpacker*Unpacker)feed(data[]byte){
unpacker._buf=append(unpacker._buf,data...)
}

func(unpacker*Unpacker)unpack()(flagbyte,msg[]byte){
str:=string(unpacker._buf)
for{
iflen(str)<5{
break
}else{
_,head,data:=Partition(str,"xy")
iflen(head)==0{//没有头
ifstr[len(str)-1]==byte(120){//120=>'x'
unpacker._buf=[]byte{byte(120)}
}else{
unpacker._buf=[]byte{}
}
break
}

buf:=bytes.NewReader([]byte(data))
msg=make([]byte,buf.Len())
vardataLenuint16
binary.Read(buf,binary.LittleEndian,&flag)
binary.Read(buf,&dataLen)

fmt.Println("DEC:",flag,dataLen)
ifbuf.Len()<int(dataLen){
break
}
binary.Read(buf,&msg)
unpacker._buf=unpacker._buf[2+1+2+dataLen:]
}
}
return
}

/*
启动服务
*/
functcpStart(portint){
initLog(os.Stderr)
initClisConnMap()
doLog("tcpStart:")
host:=":"+strconv.Itoa(port)
tcpAddr,err:=net.ResolveTCPAddr("tcp4",host)
checkError(err)

listener,err:=net.ListenTCP("tcp",tcpAddr)
checkError(err)

for{
conn,err:=listener.AcceptTCP()
iferr!=nil{
continue
}

gohandleClient(conn)
}
}

/*
socketconn
*/
funchandleClient(conn*net.TCPConn){
//****这里是初始化连接处理
addr:=conn.RemoteAddr().String()
doLog("handleClient:",addr)
connectionMade(conn)
request:=make([]byte,128)
deferconn.Close()
buf:=make([]byte,0)
for{
//****这里是读循环处理
readLoopHandled(conn)
read_len,err:=conn.Read(request)
iferr!=nil{
//这里没使用checkError因为不退出,只是break出去
doLog("ERR:","readerr",err.Error())
break
}

ifread_len==0{//在gprs时数据不能通过这个判断是否断开连接,要通过心跳包
doLog("ERR:","connectionalreadyclosedbyclient")
break
}else{
//request[:read_len]处理
buf=append(buf,request[:read_len]...)
doLog("<=",addr,string(request[:read_len]))
dataReceived(conn,&buf)
request=make([]byte,128)//clearlastreadcontent
}
}
//****这里是连接断开处理
connectionLost(conn)
}

/*
连接初始处理(ed)
*/
funcconnectionMade(conn*net.TCPConn){
//初始化连接这个函数被调用

//****建立conn映射
addr:=conn.RemoteAddr().String()
ip:=strings.Split(addr,":")[0]
mkClisConn(ip,conn)

doLog("connectionMade:",addr)

//****定时处理(心跳等)
goloopingCall(conn)
}

/*
读循环处理(ed)
*/
funcreadLoopHandled(conn*net.TCPConn){
//当进入循环读数据这个函数被调用,主要用于设置超时(好刷新设置超时)

//*****设置超时(要写在for循环里)
setReadTimeout(conn,10*time.Minute)
}

/*
客户端连接发送来的消息处理(ed)
*/
funcdataReceived(conn*net.TCPConn,pBuf*[]byte){
//一般情况可以用pBuf参数,但是如果有分包粘包的情况就必须使用clisBufMap的buf
//clisBufMap的buf不断增大,不管是否使用都应该处理
//addr:=conn.RemoteAddr().String()
doLog("*pBuf:",string(*pBuf))
//sendData(clisConnMap["192.168.6.234"],[]byte("xxx"))
sendData(conn,[]byte("echo"))
}

/*
连接断开(ed)
*/
funcconnectionLost(conn*net.TCPConn){
//连接断开这个函数被调用
addr:=conn.RemoteAddr().String()
ip:=strings.Split(addr,":")[0]

delClisConn(ip)//删除关闭的连接对应的clisMap项
doLog("connectionLost:",addr)
}

/*
发送数据
*/
funcsendData(conn*net.TCPConn,data[]byte)(nint,errerror){
addr:=conn.RemoteAddr().String()
n,err=conn.Write(data)
iferr==nil{
doLog("=>",string(data))
}
return
}

/*
广播数据
*/
funcbroadcast(tclisMapmap[string]*net.TCPConn,data[]byte){
for_,conn:=rangetclisMap{
sendData(conn,data)
}
}

/*
定时处理&延时处理
*/
funcloopingCall(conn*net.TCPConn){
pingTicker:=time.NewTicker(30*time.Second)//定时
testAfter:=time.After(5*time.Second)//延时

for{
select{
case<-pingTicker.C:
//发送心跳
_,err:=sendData(conn,[]byte("PING"))
iferr!=nil{
pingTicker.Stop()
return
}
case<-testAfter:
doLog("testAfter:")
}
}
}

/*
设置读数据超时
*/
funcsetReadTimeout(conn*net.TCPConn,ttime.Duration){
conn.SetReadDeadline(time.Now().Add(t))
}

/*
错误处理
*/
funccheckError(errerror){
iferr!=nil{
doLog("ERR:",err.Error())
os.Exit(1)
}
}

funcPartition(sstring,sepstring)(headstring,retSepstring,tailstring){
//Partition(s,sep)->(head,sep,tail)
index:=strings.Index(s,sep)
ifindex==-1{
head=s
retSep=""
tail=""
}else{
head=s[:index]
retSep=sep
tail=s[len(head)+len(sep):]
}
return
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读