加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

【GOLANG】第二章 RPC client源码分析

发布时间:2020-12-16 18:25:19 所属栏目:大数据 来源:网络整理
导读:第二章 RPCclient源码分析 rpc客户端的逻辑很简单,将一个个的 调用请求序列化后原子的发送给服务器 ,有一个专门的gorutine等待服务器应答,这goroutine会将收到的每个应答分发给对应的请求,完成了一次rpc调用。 client是基于单个socket连接来, 靠 channe



第二章RPCclient源码分析

rpc客户端的逻辑很简单,将一个个的调用请求序列化后原子的发送给服务器,有一个专门的gorutine等待服务器应答,这goroutine会将收到的每个应答分发给对应的请求,完成了一次rpc调用。

client是基于单个socket连接来,channel来实现复用连接以及并发的。而临时的调用对象Call都是保存在Client的map中的,每个call怎么查找,根据seq序列号在请求时候发过去,之后response的时候,client根据返回的seq再反查pending的map结果的。

rpc客户端,最重要的事情就是连接复用,也就是说一个连接上需要并发的同时跑多个请求

1.1结构体

typeCallstruct{
ServiceMethodstring//Thenameoftheserviceandmethodtocall.Argsinterface{}//Theargumenttothefunction(*struct).Reply//Thereplyfromthefunction(*struct).Errorerror//Aftercompletion,theerrorstatus.Donechan*Call//Strobeswhencalliscomplete.}
ClientcodecClientCodecreqMutexsync.Mutex//protectsfollowingrequestRequestmutexsync.Mutexsequint64pendingmap[]*Callclosingbool//userhascalledCloseshutdown//serverhastoldustostop}

ClientCodec//WriteRequestmustbesafeforconcurrentusebymultiplegoroutines.WriteRequest(*Request,{})ReadResponseHeader(*Response)ReadResponseBody(Close()}

定义了编码解码器的接口,要自定义一个编码解码器,只要实现这个接口就OK了。WriteRequest就是将rpc请求的远程方法名、参数等信息进行序列化打包,写入socket可以按照自己的意愿去打包请求进行发送。两个Read接口就是从socket读取响应数据包,然后采用相应的算法进行反序列化解包。

如不能自定义编码解码器,就只能用在客户端和服务器都是Go。这降低了其灵活性。自定义编码解码器后,就可以实现其他rpc的协议,比如:thrift。github的go-thrift项目

1.2调用入口NewClient

funcNewClient(connio.ReadWriteCloser)*Client

func(client*Client)Call(serviceMethodstring,argsinterface{},replyinterface{})error

funcNewClient(connio.ReadWriteCloser)*Client{encBuf:=bufio.NewWriter(conn)client:=&gobClientCodec{conngob.NewDecoderNewEncoder(encBuf)returnNewClientWithCodec(client)(codecClientCodec)*Client{client:=&Client{codec:codecpending:make]*Call)goclient.inputclient}

new一个客户端对象,通过这个对象的Call方法去执行远程服务端方法。同时建了一个inputgoroutine.inputgoroutine阻塞在连接上,waitfor服务端的响应NewClient内部使用的默认的gob编码,gobClientCodes实现了Codec的接口。

Call方法的主要是将一个rpc请求发送到服务端,同时放入一个等待队列,等待服务器的响应。

1.3input函数

func(client*Client(){varerrresponseResponse

for循环就是永久负责这个连接,只在连接上发生错误才退出。

forerr==nil{

读响应头,响应头有一个重要的信息是正文数据长度,知道这个长度信息知道读多少正文才是一个完整应答

response=Response{}err=client.codec.(&response)iferr!=break从响应头里取seq,这个seq是客户端生成的,send中发送给,服务器应答的时候,将这个seq响应给客户端。只有依靠seq客户端才知道这个应答是对应pending队列中的个请求。

seq:=response.Seqclient.mutex.Lockcall:=client.pending[seq]delete(client.pendingseq)Unlockswitchcasecall==:取出body里的内容并丢弃(err=errors.New"readingerrorbody:"+err.())response.Error!=""//We'vegotanerrorresponse.Givethistotherequest;//anysubsequentrequestswillgettheReadResponseBody//errorifthereisone.call.Error=ServerError(response.Error)call.donedefault(call.Reply)call.Error=errors."readingbody"下面代码在处理连接上出错,以及关闭连接等情况的清理工作.如果不清可能导致一些调用rpc的goroutine永久阻塞等待。

//Terminatependingcalls.client.reqMutex.client.shutdown=trueclosing:=client.closingerr==io.EOF{closing{err=ErrShutdown}elseerr=io.ErrUnexpectedEOF_call:=rangeclient.pending{call.Error=errdebugLog&&err!=io.EOF&&!closing{log.Println"rpc:clientprotocolerror:"err)等待一个读取应答的goroutine只有一个,这个goroutine负责读取有响应,将响应分发给对应的请求,就完成了一次rpc请求。

1.4send函数

send(call*Call){defer//Registerthiscall.client.shutdown||client.closing{call.Error=ErrShutdownreturn}

pending使用map实现的,rpc请求都会生存一个唯一递增的seq,seq就是用来标记请求的,这个很像tcp包的seq

seq:=client.seqclient.seq++client.pending[seq]=call//Encodeandsendtherequest.client.request.Seq=seqclient.request.ServiceMethod=call.ServiceMethoderr:=client.codec.(&client.requestcall.Args)call=client.pending[seq]call!=一个rpc请求发出去主要过程,第一是将请求放入等待队列,第二是序列化,最后是写入socket

1.5接口函数Call

同步死等:发送请求的时候,调用就是Call,传入方法名,参数,获取返回等

Call(serviceMethodargsreplycall:=<-client.Goargsreply1)).Donereturncall.Error}

1.6接口函数Go

异步

done*Call)*Call{call:=new(Call)call.ServiceMethod=serviceMethodcall.Args=argscall.Reply=replydone==done=10//buffered.//Ifcallerpassesdone!=nil,itmustarrangethat//donehasenoughbufferforthenumberofsimultaneous//RPCsthatwillbeusingthatchannel.Ifthechannel//istotallyunbuffered,it'sbestnottorunatall.cap(done)==0Panic"rpc:donechannelisunbuffered"call.Done=done(call)call}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读