【GOLANG】第二章 RPC client源码分析
第二章RPCclient源码分析
rpc客户端的逻辑很简单,将一个个的调用请求序列化后原子的发送给服务器,有一个专门的gorutine等待服务器应答,这goroutine会将收到的每个应答分发给对应的请求,完成了一次rpc调用。 client是基于单个socket连接来,靠channel来实现复用连接以及并发的。而临时的调用对象Call都是保存在Client的map中的,对每个call怎么查找,根据seq序列号在请求时候发过去,之后response的时候,client根据返回的seq再反查pending的map结果的。 rpc客户端,最重要的事情就是连接复用,也就是说一个连接上需要并发的同时跑多个请求。 1.1结构体typeCallstruct{ ClientCodec//WriteRequestmustbesafeforconcurrentusebymultiplegoroutines.WriteRequest(*Request,{})ReadResponseHeader(*Response)ReadResponseBody(Close()} 定义了编码解码器的接口,如要自定义一个编码解码器,只要实现这个接口就OK了。WriteRequest就是将rpc请求的远程方法名、参数等信息进行序列化打包,写入socket中。也可以按照自己的意愿去打包请求进行发送。两个Read接口就是从socket读取响应数据包,然后采用相应的算法进行反序列化解包。 如不能自定义编码解码器,就只能用在客户端和服务器都是Go。这降低了其灵活性。自定义编码解码器后,就可以实现其他rpc的协议,比如:thrift。github的go-thrift项目。 1.2调用入口NewClientfuncNewClient(connio.ReadWriteCloser)*Client func(client*Client)Call(serviceMethodstring,argsinterface{},replyinterface{})error funcNewClient(connio.ReadWriteCloser)*Client{encBuf:=bufio.NewWriter(conn)client:=&gobClientCodec{conngob.NewDecoderNewEncoder(encBuf)returnNewClientWithCodec(client)(codecClientCodec)*Client{client:=&Client{codec:codecpending:make]*Call)goclient.inputclient} new一个客户端对象,通过这个对象的Call方法去执行远程服务端方法。同时建了一个inputgoroutine.inputgoroutine阻塞在连接上,waitfor服务端的响应。NewClient内部使用的默认的gob编码,gobClientCodes实现了Codec的接口。 Call方法的主要是将一个rpc请求发送到服务端,同时放入一个等待队列,等待服务器的响应。 1.3input函数func(client*Client(){varerrresponseResponse for循环就是永久负责这个连接,只在连接上发生错误才退出。 forerr==nil{ 读响应头,响应头有一个重要的信息是正文数据长度,知道这个长度信息才知道读多少正文才是一个完整应答。 response=Response{}err=client.codec.(&response)iferr!=break从响应头里取seq,这个seq是客户端生成的,send中发送给,服务器应答的时候,将这个seq响应给客户端。只有依靠seq客户端才知道这个应答是对应pending队列中的哪个请求。 seq:=response.Seqclient.mutex.Lockcall:=client.pending[seq]delete(client.pendingseq)Unlockswitchcasecall==:取出body里的内容并丢弃(err=errors.New"readingerrorbody:"+err.())response.Error!=""//We'vegotanerrorresponse.Givethistotherequest;//anysubsequentrequestswillgettheReadResponseBody//errorifthereisone.call.Error=ServerError(response.Error)call.donedefault(call.Reply)call.Error=errors."readingbody"下面代码在处理连接上出错,以及关闭连接等情况的清理工作.如果不清可能导致一些调用rpc的goroutine永久阻塞等待。 //Terminatependingcalls.client.reqMutex.client.shutdown=trueclosing:=client.closingerr==io.EOF{closing{err=ErrShutdown}elseerr=io.ErrUnexpectedEOF_call:=rangeclient.pending{call.Error=errdebugLog&&err!=io.EOF&&!closing{log.Println"rpc:clientprotocolerror:"err)等待一个读取应答的goroutine只有一个,这个goroutine负责读取有响应,将响应分发给对应的请求,就完成了一次rpc请求。 1.4send函数send(call*Call){defer//Registerthiscall.client.shutdown||client.closing{call.Error=ErrShutdownreturn} pending使用map实现的,rpc请求都会生存一个唯一递增的seq,seq就是用来标记请求的,这个很像tcp包的seq seq:=client.seqclient.seq++client.pending[seq]=call//Encodeandsendtherequest.client.request.Seq=seqclient.request.ServiceMethod=call.ServiceMethoderr:=client.codec.(&client.requestcall.Args)call=client.pending[seq]call!=一个rpc请求发出去主要过程,第一是将请求放入等待队列,第二是序列化,最后是写入socket。 1.5接口函数Call同步死等:发送请求的时候,调用就是Call,传入方法名,参数,获取返回等 Call(serviceMethodargsreplycall:=<-client.Goargsreply1)).Donereturncall.Error} 1.6接口函数Go异步 done*Call)*Call{call:=new(Call)call.ServiceMethod=serviceMethodcall.Args=argscall.Reply=replydone==done=10//buffered.//Ifcallerpassesdone!=nil,itmustarrangethat//donehasenoughbufferforthenumberofsimultaneous//RPCsthatwillbeusingthatchannel.Ifthechannel//istotallyunbuffered,it'sbestnottorunatall.cap(done)==0Panic"rpc:donechannelisunbuffered"call.Done=done(call)call} (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |