加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

详解golang net之transport

发布时间:2020-12-16 09:18:30 所属栏目:大数据 来源:网络整理
导读:关于golang http transport的讲解,网上有很多文章进行了解读,但都比较粗,很多代码实现并没有讲清楚。故给出更加详细的实现说明。整体看下来细节实现层面还是比较难懂的。 本次使用golang版本1.12.9 transport实现了RoundTripper接口,该接口只有一个方法R

关于golang http transport的讲解,网上有很多文章进行了解读,但都比较粗,很多代码实现并没有讲清楚。故给出更加详细的实现说明。整体看下来细节实现层面还是比较难懂的。

本次使用golang版本1.12.9

transport实现了RoundTripper接口,该接口只有一个方法RoundTrip(),故transport的入口函数就是RoundTrip()。transport的主要功能其实就是缓存了长连接,用于大量http请求场景下的连接复用,减少发送请求时TCP(TLS)连接建立的时间损耗,同时transport还能对连接做一些限制,如连接超时时间,每个host的最大连接数等。transport对长连接的缓存和控制仅限于TCP+(TLS)+HTTP1,不对HTTP2做缓存和限制。

tranport包含如下几个主要概念:

  • 连接池:在idleConn中保存了不同类型(connectMethodKey)的请求连接(persistConn)。当发生请求时,首先会尝试从连接池中取一条符合其请求类型的连接使用
  • readLoop/writeLoop:连接之上的功能,循环处理该类型的请求(发送request,返回response)
  • roundTrip:请求的真正入口,接收到一个请求后会交给writeLoop和readLoop处理。

一对readLoop/writeLoop只能处理一条连接,如果这条连接上没有更多的请求,则关闭连接,退出循环,释放系统资源

下述代码都来自golang源码的src/net/httptransport.go文件

type RoundTripper interface {
    // RoundTrip executes a single HTTP transaction,returning
     a Response for the provided Request.
    //
     RoundTrip should not attempt to interpret the response. In
     particular,RoundTrip must return err == nil if it obtained
     a response,regardless of the response's HTTP status code.
     A non-nil err should be reserved for failure to obtain a
     response. Similarly,RoundTrip should not attempt to
     handle higher-level protocol details such as redirects, authentication,or cookies.
     RoundTrip should not modify the request,except for
     consuming and closing the Request's Body. RoundTrip may
     read fields of the request in a separate goroutine. Callers
     should not mutate or reuse the request until the Response's
     Body has been closed.
     RoundTrip must always close the body,including on errors,1)"> but depending on the implementation may do so in a separate
     goroutine even after RoundTrip returns. This means that
     callers wanting to reuse the body for subsequent requests
     must arrange to wait for the Close call before doing so.
     The Request's URL and Header fields must be initialized.
    RoundTrip(*Request) (*Response,error)
}

Transport结构体中的主要成员如下(没有列出所有成员):

wantIdle                                                要求关闭所有idle的persistConn
reqCanceler map[*Request]func(error)                    用于取消request
idleConn   map[connectMethodKey][]*persistConn          idle状态的persistConn连接池,最大值受maxIdleConnsPerHost限制
idleConnCh map[connectMethodKey]chan *persistConn       用于给调用者传递persistConn
connPerHostCount     map[connectMethodKey]int           表示一类连接上的host数目,最大值受MaxConnsPerHost限制
connPerHostAvailable map[connectMethodKey]chan struct{} 与connPerHostCount配合使用,判断该类型的连接数目是否已经达到上限
idleLRU    connLRU                                      长度受MaxIdleConns限制,队列方式保存所有idle的pconn
altProto   atomic.Value                                 nil or map[string]RoundTripper,key为URI scheme,表示处理该scheme的RoundTripper实现。注意与TLSNextProto的不同,前者表示URI的scheme,后者表示tls之上的协议。如前者不会体现http2,后者会体现http2
Proxy func(*Request) (*url.URL,error)                  为request返回一个代理的url
DisableKeepAlives bool                                  是否取消长连接
DisableCompression                                  是否取消HTTP压缩
MaxIdleConns                                         所有host的idle状态的最大连接数目,即idleConn中所有连接数
MaxIdleConnsPerHost                                  每个host的idle状态的最大连接数目,即idleConn中的key对应的连接数
MaxConnsPerHost                                         每个host上的最大连接数目,含dialing/active/idle状态的connections。http2时,每个host只允许有一条idle的conneciton
DialContext func(ctx context.Context,network,addr string) (net.Conn,error) 创建未加密的tcp连接,比Dial函数增加了context控制
Dial func(network,1)">) (net.Conn,error)       创建未加密的tcp连接,废弃,使用DialContext
DialTLS func(network,error)    为非代理模式的https创建连接的函数,如果该函数非空,则不会使用Dial函数,且忽略TLSClientConfig和TLSHandshakeTimeout;反之使用Dila和TLSClientConfig。即有限使用DialTLS进行tls协商
TLSClientConfig *tls.Config                             tls client用于tls协商的配置
IdleConnTimeout                                         连接保持idle状态的最大时间,超时关闭pconn
TLSHandshakeTimeout time.Duration                       tls协商的超时时间
ResponseHeaderTimeout time.Duration                     发送完request后等待serve response的时间
TLSNextProto map[string]func(authority string,c *tls.Conn) RoundTripper 在tls协商带NPN/ALPN的扩展后,transport如何切换到其他协议。指tls之上的协议(next指的就是tls之上的意思)
ProxyConnectHeader Header                               在CONNECT请求时,配置request的首部信息,可选
MaxResponseHeaderBytes                                  指定server响应首部的最大字节数

Transport.roundTrip是主入口,它通过传入一个request参数,由此选择一个合适的长连接来发送该request并返回response。整个流程主要分为两步:

使用getConn函数来获得底层TCP(TLS)连接;调用roundTrip函数进行上层协议(HTTP)处理。

func (t *Transport) roundTrip(req *Request) (* req.Context()
    trace := httptrace.ContextClientTrace(ctx)

    if req.URL == nil {
        req.closeBody()
        return nil,errors.New("http: nil Request.URL")
    }
    if req.Header ==http: nil Request.Header)
    }
    scheme := req.URL.Scheme
    isHTTP := scheme == http" || scheme == https
// 下面判断request首部的有效性 if isHTTP { for k,vv := range req.Header { if !httpguts.ValidHeaderFieldName(k) { net/http: invalid header field name %q,k) } for _,v := range vv { httpguts.ValidHeaderFieldValue(v) { net/http: invalid header field value %q for key %v // 判断是否使用注册的RoundTrip来处理对应的scheme。对于使用tcp+tls+http1(wss协议升级)的场景
// 不能使用注册的roundTrip。后续代码对tcp+tls+http1或tcp+http1进行了roundTrip处理
t.useRegisteredProtocol(req) { altProto,_ := t.altProto.Load().(map[]RoundTripper) if altRT := altProto[scheme]; altRT != nil { if resp,err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { return resp,err } } }

// 后续仅处理URL scheme为http或https的连接
isHTTP { req.closeBody() unsupported protocol schemeif req.Method != "" && !validMethod(req.Method) { net/http: invalid method %qif req.URL.Host == "" { req.closeBody() http: no Host in request URL) }
// 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled)
// 的情况是不会进行重试的。具体参见shouldRetryRequest函数
for { selectcase <-ctx.Done(): req.closeBody() nil,ctx.Err() default: } // treq gets modified by roundTrip,so we need to recreate for each retry. treq := &transportRequest{Request: req,trace: trace}
// connectMethodForRequest函数通过输入一个request返回一个connectMethod(简称cm),该类型通过
// {proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1}
// 来表示一个请求。一个符合connectMethod描述的request将会在Transport.idleConn中匹配到一类长连接。
cm,err := t.connectMethodForRequest(treq) if err != nil { req.closeBody()
获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,
//
使用其自注册的RoundTrip处理。该函数描述参见下面内容。
// 从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
        pconn,err := t.getConn(treq,cm)// 如果获取底层连接失败,无法继续上层协议的请求,直接返回错误
         nil {  // 每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置
       t.setReqCanceler(req,nil)
            req.closeBody()
            var resp *Response
// pconn.alt就是从Transport.TLSNextProto中获取的,它表示TLS之上的协议,如HTTP2。从persistConn.alt的注释中可以看出
// 目前alt仅支持HTTP2协议,后续可能会支持更多协议。
if pconn.alt != nil { HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制 t.decHostConnCount(cm.key())
// 清除getConn中设置的标记。具体参见getConn
t.setReqCanceler(req,nil) resp,err = pconn.alt.RoundTrip(req) } else { // pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。
// 通过writeLoop发送request,通过readLoop返回response
resp,err = pconn.roundTrip(treq) } // 如果成功返回response,则整个处理结束. if err == // 判断该request是否满足重试条件,大部分场景是不支持重试的,仅有少部分情况支持,如errServerClosedIdle
// err 非nil时实际并没有在原来的连接上重试,且pconn没有关闭,提了issue
pconn.shouldRetryRequest(req,err) { Issue 16465: return underlying net.Conn.Read error from peek,1)"> as we've historically done. if e,ok := err.(transportReadFromServerError); ok { err = e.err } Rewind the body if we're able to.
// 用于重定向场景
if req.GetBody != nil { newReq := *req var err error newReq.Body,1)"> req.GetBody() nil { newReq } } }

?getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接

func (t *Transport) getConn(treq *transportRequest,cm connectMethod) (*persistConn,error) {
    req := treq.Request
    trace := treq.trace
    ctx := req.Context()
    if trace != nil && trace.GetConn != nil {
        trace.GetConn(cm.addr())
    }
// 从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接
if pc,idleSince := t.getIdleConn(cm); pc != nil { if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(idleSince)) }
// 此处设置transport.reqCanceler比较难理解,主要功能是做一个标记,用于判断当前到执行pconn.roundTrip
// 期间,request有没有被(如Request.Cancel,Request.Context().Done())取消,被取消的request将无需继续roundTrip处理
t.setReqCanceler(req,func(error) {}) pc,nil } type dialRes { pc *persistConn err error }
// 该chan中用于存放通过dialConn函数新创建的长连接persistConn(后续简称pconn),表示一条TCP(TLS)的底层连接. dialc :
= make(chan dialRes)
// cmKey实际就是把connectMethod中的元素全部字符串化。cmKey作为一类连接的标识,如Transport.idleConn[cmKey]就表示一类特定的连接 cmKey :
= cm.key() Copy these hooks so we don't race on the postPendingDial in the goroutine we launch. Issue 11136. testHookPrePendingDial := testHookPrePendingDial testHookPostPendingDial := testHookPostPendingDial // 在尝试获取连接的时候,如果此时正在创建一条连接,但最后没有选择这条新建的连接(有其它调用者释放了一条连接),
// 此时,handlePendingDial负责将这条新创建的连接放到Transport.idleConn连接池中
handlePendingDial :
= func() { testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil {
// 将一条连接放入连接池中,描述见下文--tryPutIdleConn t.putOrCloseIdleConn(v.pc) }
{ t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc := make(chan error,1)
// 为request设置ReqCanceler。transport代码中不会主动调用该ReqCanceler函数(会在
// roundTrip中调用replaceReqCanceler将其覆盖),可能的原因是transport提供了一个对外API CancelRequest,
// 用户可以调用该函数取消连接,此时会调用该ReqCanceler。需要注意的是从CancelRequest的注释中可以看出,该API
// 已经被废弃,这段代码后面可能会被删除(如果有不同看法,请指出)
t.setReqCanceler(req,func(err error) { cancelc
<- err }) // 如果对host上建立的连接有限制 if t.MaxConnsPerHost > 0 {
// incHostConnCount会根据主机已经建立的连接是否达到t.MaxConnsPerHost来返回一个未关闭
// 的chan(连接数达到MaxConnsPerHost)或关闭的chan(连接数未达到MaxConnsPerHost),
// 返回未关闭的chan时会阻塞等待其他请求释放连接,不能新创建pconn;反之可以使用新创建的pconn
t.incHostConnCount(cmKey): 等待获取某一类连接对应的chan。tryPutIdleConn函数中会尝试将新建或释放的连接放入到该chan中 case pc := <-t.getIdleConnCh(cm): nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn,Reused: pc.isReused()}) } // 下面2个case都表示request被取消,其中Cancel被废弃,建议使用Context来取消request req.Cancel: req.Context().Done(): case err := <-cancelc: errRequestCanceled { err = errRequestCanceledConn } // 新建连接,创建好后将其放入dialc chan中 pc,1)"> t.dialConn(ctx,cm) dialc <- dialRes{pc,err} }()
// 下面会通过两种途径来获得连接:从dialc中获得通过dialConn新建的连接;通过idleConnCh获得其他request释放的连接
// 如果首先获取到的是dialConn新建的连接,直接返回该连接即可;如果首先获取到的是其他request释放的连接,在返回该连接前
// 需要调用handlePendingDial来处理dialConn新建的连接。
idleConnCh :
= t.getIdleConnCh(cm) // 获取dialConn新建的连接 case v := <-dialc: Our dial finished. if v.pc !=if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } v.pc,nil } // 仅针对MaxConnsPerHost>0有效,对应上面的incHostConnCount() t.decHostConnCount(cmKey)
// 下面用于返回更易读的错误信息
It was an error due to cancelation,so prioritize that error value. (Issue 16049) : It wasn't an error due to cancelation,so return the original error message: // 获取其他request释放的连接 idleConnCh: Another request finished first and its net.Conn became available before our dial. Or somebody else's dial that they didn't use. But our dial is still going,so give it away when it finishes: handlePendingDial() nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn,Reused: pc.isReused()}) } // 如果request取消,也需要调用handlePendingDial处理新建的连接 req.Cancel: handlePendingDial() req.Context().Done(): handlePendingDial() cancelc: handlePendingDial() errRequestCanceled { err = errRequestCanceledConn } tryPutIdleConn函数用来将一条新创建或回收的连接放回连接池中,以便后续使用。与getIdleConnCh配合使用,后者用于获取一类连接对应的chan。在如下场景会将一个连接放回idleConn中

    • 在readLoop成功之后(当然还有其他判断,如底层链路没有返回EOF错误);
    • 创建一个新连接且新连接没有被使用时;
    • roundTrip一开始发现request被取消时
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
// 当不使用长连接或该主机上的连接数小于0(即不允许缓存任何连接)时,返回错误并关闭创建的连接(此处没有做关闭处理,
// 但存在不适用的连接时必须关闭,如使用putOrCloseIdleConn)。
// 可以看出当不使用长连接时,Transport不能缓存连接
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < errKeepAlivesDisabled } pconn.isBroken() { errConnBroken }
// 如果是HTTP2连接,则直接返回,不缓存该连接
errNotCachingH2Conn }
// 为新连接标记可重用状态,新创建的连接肯定是可以重用的,用于在Transport.roundTrip
// 中的shouldRetryRequest函数中判断连接是否可以重用
pconn.markReused()
// 该key对应Transport.idleConn中的key,标识特定的连接 key :
= pconn.cacheKey t.idleMu.Lock() defer t.idleMu.Unlock() // idleConnCh中的chan元素用于存放可用的连接pconn,每类连接都有一个chan waitingDialer := t.idleConnCh[key] // 如果此时有调用者等待一个连接,则直接将该连接传递出去,不进行保存,这种做法有利于提高效率 case waitingDialer <- pconn: We're done with this pconn and somebody else is currently waiting for a conn of this type (they're actively dialing,but this conn is ready first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ nil :
// 如果没有调用者等待连接,则清除该chan。删除map中的chan直接会关闭该chan
if waitingDialer != They had populated this,but their dial won first,so we can clean up this map entry. delete(t.idleConnCh,key) } }
// 与DisableKeepAlives有点像,当用户需要关闭所有idle的连接时,不会再缓存连接
t.wantIdle { errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } idles := t.idleConn[key]
// 当主机上该类连接数超过Transport.MaxIdleConnsPerHost时,不能再保存新的连接,返回错误并关闭连接
if len(idles) >= t.maxIdleConnsPerHost() { errTooManyIdleHost }
// 需要缓存的连接与连接池中已有的重复,系统退出(这种情况下系统已经发生了混乱,直接退出)
range idles { if exist == pconn { log.Fatalf(dup idle pconn %p in freelist// 添加待缓存的连接 t.idleConn[key] = append(idles,pconn) t.idleLRU.add(pconn)
// 受MaxIdleConns的限制,添加策略变为:添加新的连接,删除最老的连接。
// MaxIdleConns限制了所有类型的idle状态的最大连接数目,而MaxIdleConnsPerHost限制了host上单一类型的最大连接数目
// idleLRU中保存了所有的连接,此处的作用为,找出最老的连接并移除
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) }
// 为新添加的连接设置超时时间
if t.IdleConnTimeout > if pconn.idleTimer != nil {
// 如果该连接是被释放的,则重置超时时间 pconn.idleTimer.Reset(t.IdleConnTimeout) }
// 如果该连接时新建的,则设置超时时间并设置超时动作pconn.closeConnIfStillIdle
// closeConnIfStillIdle用于释放连接,从Transport.idleLRU和Transport.idleConn中移除并关闭该连接
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout,pconn.closeConnIfStillIdle) } }
pconn.idleAt
= time.Now() nil }

?dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop

func (t *Transport) dialConn(ctx context.Context,cm connectMethod) (*persistConn,error) {
pconn :
= &persistConn{ t: t,cacheKey: cm.key(),reqch: make(chan requestAndChan,),writech: make(chan writeRequest,closech: make(chan {}),writeErrCh: make(chan error,writeLoopDone: make(chan httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { Return a typed error,per Issue 16997 return &net.OpError{Op: proxyconnect",Net: tcp err }
// 调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout
// 参数会被忽略
if cm.scheme() == " && t.DialTLS != err error // 调用注册的连接函数创建一条连接,注意cm.addr()的实现,如果该连接存在proxy,则此处是与proxy建立TLS连接;否则直接连server。
// 存在proxy时,与server建立连接分为2步:与proxy建立TLP(TLS)连接;与server建立HTTP(HTTPS)连接
// func (cm *connectMethod) addr() string { // if cm.proxyURL != nil { // return canonicalAddr(cm.proxyURL) // } // return cm.targetAddr // }
pconn.conn,err = t.DialTLS(if pconn.conn ==net/http: Transport.DialTLS returned (nil,nil))) }// 如果连接类型是TLS的,则需要处理TLS协商 if tc,ok := pconn.conn.(*tls.Conn); ok { Handshake here,in case DialTLS didn't. TLSNextProto below depends on it for knowing the connection state. if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() }// 启动TLS协商,如果协商失败需要关闭连接 if err := tc.Handshake(); err != nil { go pconn.conn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{},err) } tc.ConnectionState() nil { trace.TLSHandshakeDone(cs,nil) }// 保存TLS协商结果 pconn.tlsState = &cs } } {// 使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr() conn,err := t.dial(ctx,wrapErr(err) } pconn.conn = conn // 如果scheme是需要TLS协商的,则处理TLS协商,否则为普通的HTTP连接 if cm.scheme() == { var firstTLSHost string if firstTLSHost,_,err = net.SplitHostPort(cm.addr()); err !=// 进行TLS协商,具体参见下文addTLS if err = pconn.addTLS(firstTLSHost,trace); err != 处理proxy的情况 switch // 不存在proxy 直接跳过 case cm.proxyURL == nil: case cm.proxyURL.Scheme == socks5: conn := pconn.conn d := socksNewDialer(if u := cm.proxyURL.User; u != nil { auth := &socksUsernamePassword{ Username: u.Username(),} auth.Password,_ = u.Password() d.AuthMethods = []socksAuthMethod{ socksAuthMethodNotRequired,socksAuthMethodUsernamePassword,} d.Authenticate = auth.Authenticate } if _,err := d.DialWithConn(ctx,conn,cm.targetAddr); err != nil { conn.Close() // 如果存在proxy,且server的scheme为"http",如果需要代理认证,则设置认证信息 case cm.targetScheme == : pconn.isProxy = true if pa := cm.proxyAuth(); pa != { pconn.mutateHeaderFunc = func(h Header) { h.Set(Proxy-Authorization // 如果存在proxy,且server的scheme为"https"。与"http"不同,在与server进行tls协商前,会给proxy
// 发送一个method为"CONNECT"的HTTP请求,如果请求通过(返回200),则可以继续与server进行TLS协商
:
// 该conn表示与proxy建立的连接 conn :
= pconn.conn hdr := t.ProxyConnectHeader if hdr == nil { hdr = make(Header) } connectReq := &Request{ Method: CONNECTurl.URL{Opaque: cm.targetAddr},Host: cm.targetAddr,Header: hdr,} { connectReq.Header.Set( // 发送"CONNECT" http请求 connectReq.Write(conn) Read response. Okay to use and discard buffered reader here,because TLS server will not speak until spoken to. br := bufio.NewReader(conn) resp,1)"> ReadResponse(br,connectReq) // proxy返回非200,表示无法建立连接,可能情况如proxy认证失败 if resp.StatusCode != 200 { f := strings.SplitN(resp.Status,1)">" 2) conn.Close() if len(f) < { unknown status code) } ]) } } // 与proxy建立连接后,再与server进行TLS协商 if cm.proxyURL != nil && cm.targetScheme == if err := pconn.addTLS(cm.tlsHost(),err } } // 后续进行TLS之上的协议处理,如果TLS之上的协议为注册协议,则使用注册的roundTrip进行处理
// TLS之上的协议为TLS协商过程中使用NPN/ALPN扩展协商出的协议,如HTTP2(参见golang.org/x/net/http2)
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != if next,1)"> t.TLSNextProto[s.NegotiatedProtocol]; ok { return &persistConn{alt: next(cm.targetAddr,pconn.conn.(*tls.Conn))},nil } } { pconn.conn = &connCloseListener{Conn: pconn.conn,t: t,cmKey: pconn.cacheKey} }
// 创建读写通道,writeLoop用于发送request,readLoop用于接收响应。roundTrip函数中会通过chan给writeLoop发送
// request,通过chan从readLoop接口response。每个连接都有一个readLoop和writeLoop,连接关闭后,这2个Loop也会退出。
// pconn.br给readLoop使用,pconn.bw给writeLoop使用,注意此时已经建立了tcp连接。
pconn.br
= bufio.NewReader(pconn) pconn.bw = bufio.NewWriter(persistConnWriter{pconn}) go pconn.readLoop() go pconn.writeLoop() pconn,nil }

?addTLS用于进行非注册协议下的TLS协商

func (pconn *persistConn) addTLS(name httptrace.ClientTrace) error {
     Initiate TLS and check remote host name against certificate.
    cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
    if cfg.ServerName ==  {
        cfg.ServerName = name
    }
     pconn.cacheKey.onlyH1 {
        cfg.NextProtos = nil
    }
    plainConn := pconn.conn
// 配置TLS client,包含一个TCP连接和TLC配置 tlsConn :
= tls.Client(plainConn,cfg) errc := make(chan error,1)">) var timer *time.Timer
// 设置TLS超时时间,并在超时后往errc中写入一个tlsHandshakeTimeoutError{} if d := pconn.t.TLSHandshakeTimeout; d != { timer = time.AfterFunc(d,func() { errc <- tlsHandshakeTimeoutError{} }) } go func() { nil { trace.TLSHandshakeStart() }
// 执行TLS协商,如果协商没有超时,则将协商结果err放入errc中 err :
= tlsConn.Handshake() if timer != nil { timer.Stop() } errc <- err }()
// 阻塞等待TLS协商结果,如果协商失败或协商超时,关闭底层连接
if err := <-errc; err != nil { plainConn.Close() nil { trace.TLSHandshakeDone(tls.ConnectionState{},err) } err }
// 获取协商结果并设置到pconn.tlsState cs :
= tlsConn.ConnectionState() nil { trace.TLSHandshakeDone(cs,nil) } pconn.tlsState = &cs pconn.conn = tlsConn nil }

?在获取到底层TCP(TLS)连接后在roundTrip中处理上层协议:即发送HTTP request,返回HTTP response。roundTrip给writeLoop提供request,从readLoop获取response。

一个roundTrip用于处理一类request。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *// 此处与getConn中的"t.setReqCanceler(req,func(error) {})"相对应,用于判断request是否被取消
// 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
pc.t.replaceReqCanceler(req.Request,pc.cancelRequest) { pc.t.putOrCloseIdleConn(pc) // 与readLoop配合使用,表示期望的响应的个数 pc.numExpectedResponses++
// dialConn中定义的函数,设置了proxy的认证信息 headerFn := pc.mutateHeaderFunc pc.mu.Unlock() if headerFn != nil { headerFn(req.extraHeaders()) } Ask for a compressed version if the caller didn't set their own value for Accept-Encoding. We only attempt to uncompress the gzip stream if we were the layer that requested it. requestedGzip := false
// 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且
// 仅在调用者没有设置这些首部时设置
if !pc.t.DisableCompression && req.Header.Get(Accept-Encoding") == "" &&Range req.Method != HEAD Request gzip only,not deflate. Deflate is ambiguous and not as universally supported anyway. See: https://zlib.net/zlib_faq.html#faq39 // Note that we don't request this for HEAD requests,1)"> due to a bug in nginx: // https://trac.nginx.org/nginx/ticket/358 https://golang.org/issue/5522 We don't request gzip if the request is for a range,since auto-decoding a portion of a gzipped document will just fail anyway. See https://golang.org/issue/8923 requestedGzip = true req.extraHeaders().Set(gzip) }
// 用于处理首部含"Expect: 100-continue"的request,客户端使用该首部探测服务器是否能够
// 处理request首部中的规格要求(如长度过大的request)。
var continueCh chan {} if req.ProtoAtLeast(1,1)">1) && req.Body != nil && req.expectsContinue() { continueCh = make(chan struct{},1)">) } // HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会
// 新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
// "Connection: close",则为request设置该首部。将底层表现与上层协议保持一致。
if pc.t.DisableKeepAlives && !req.wantsClose() { req.extraHeaders().Set(Connectionclose // 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞 gone := make(chan {}) defer close(gone) defer func() { nil { pc.t.setReqCanceler(req.Request,nil) } }() const debugRoundTrip = false Write the request concurrently with waiting for a response,1)"> in case the server decides to reply before reading our full request body.
// 表示发送了多少个字节的request,debug使用 startBytesWritten := pc.nwrite
// 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop
// 接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response writeErrCh :
= make(chan error,1)">) pc.writech <- writeRequest{req,writeErrCh,continueCh}
// 给readLoop封装并发送信息 resc :
= make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request,ch: resc,addedGzip: requestedGzip,continueCh: continueCh,callerGone: gone,} var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done()
// 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse
// 退出roundtrip函数
{ testHookWaitResLoop() // writeLoop返回发送request后的结果 writeErrCh: debugRoundTrip { req.logf(writeErrCh resv: %T/%#v nil { pc.close(fmt.Errorf(write error: %v // 设置一个接收response的定时器,如果在这段时间内没有接收到response(即没有进入下面代码
// 的"case re := <-resc:"分支),超时后进入""case <-respHeaderTimer:分支,关闭连接,
// 防止readLoop一直等待读取response,导致处理阻塞;没有超时则关闭定时器
if d := pc.t.ResponseHeaderTimeout; d > debugRoundTrip { req.logf(starting timer for %v time.NewTimer(d) defer timer.Stop() prevent leaks respHeaderTimer = timer.C }
// 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request
// 关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
pc.closech: closech recv: %T %#v// 等待获取response超时,关闭连接 respHeaderTimer: timeout waiting for response headers.) } pc.close(errTimeout) // 接收到readLoop返回的response结果 case re := <-resc:
// 极异常情况,直接程序panic
if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf(internal error: exactly one of res or err should be set; nil=%v nil)) } resc recv: %p,%T/%#vif re.err != re.res,nil
// request取消
cancelChan: pc.t.CancelRequest(req.Request)
// 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0) cancelChan
= nil ctxDoneChan: pc.t.cancelRequest(req.Request,req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }

?writeLoop用于发送request请求

func (pc *persistConn) writeLoop() {
    defer close(pc.writeLoopDone)
    // writeLoop会阻塞等待两个IO case:
// 循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;
// 如果底层连接关闭,则退出writeLoop
case wr := <-pc.writech: startBytesWritten := pc.nwrite
// 构造request并发送request请求。waitForContinue用于处理首部含"Expect: 100-continue"的request err :
= wr.req.Request.write(pc.bw,pc.isProxy,wr.req.extra,pc.waitForContinue(wr.continueCh)) if bre,1)"> err.(requestBodyReadError); ok { err = bre.error Errors reading from the user's Request.Body are high priority. Set it here before sending on the channels below or calling pc.close() which tears town connections and causes other errors. wr.req.setError(err) }
nil { err = pc.bw.Flush() }
// 请求失败时,需要关闭request和底层连接
nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } }
// 将结果发送给readLoop的
pc.wroteRequest()函数处理
            pc.writeErrCh <- err  to the body reader,which might recycle us
// 将结果返回给roundTrip处理,防止响应超时 wr.ch <- err
// 如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech,
// 同时会导致readLoop退出
nil { pc.close(err) } } } }

readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。当readLoop正常处理完一个response之后,会将连接重新放入到连接池中;

当readloop退出后,该连接会被关闭移除。

func (pc *persistConn) readLoop() {
    closeErr := errReadLoopExiting  default value,if not changed below // 当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个
// 处理,任何一个loop退出(协议升级除外)则该连接不可用
// readLoo跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源 defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }()
// 尝试将连接放回连接池 tryPutIdleConn :
= func(trace *httptrace.ClientTrace) if err := pc.t.tryPutIdleConn(pc); err != nil { closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } } eofc is used to block caller goroutines reading from Response.Body at EOF until this goroutines has (potentially) added the connection back to the idle pool.
// 从上面注释可以看出该变量主要用于阻塞调用者协程读取EOF的resp.body,
// 直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,
// 便于连接快速重用 eofc := make(chan {})
// 出现错误时也需要释放读取resp.Body的协程,防止调用者协程挂死 defer close(eofc)
unblock reader on errors Read this once,before loop starts. (to avoid races in tests) testHookMu.Lock() testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead testHookMu.Unlock() alive := true alive {
// 获取允许的response首部的最大字节数 pc.readLimit
= pc.maxHeaderResponseSize()
// 从接收buffer中peek一个字节来判断底层是否接收到response。roundTrip保证了request先于response发送。
// 此处peek会阻塞等待response(这也是roundtrip中设置response超时定时器的原因)。goroutine中的read/write
// 操作都是阻塞模式。 _,err :
= pc.br.Peek() pc.mu.Lock()
// 如果期望的response为0,则直接退出readLoop并关闭连接此时连接上没有需要处理的数据,
// 关闭连接,释放系统资源。
if pc.numExpectedResponses == { pc.readLoopPeekFailLocked(err) pc.mu.Unlock() } pc.mu.Unlock() // 阻塞等待roundTrip发来的数据 rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response
// 如果有response数据,则读取并解析为Response格式
nil { resp,1)"> pc.readResponse(rc,trace) } {
// 可能的错误如server端关闭,发送EOF err
= transportReadFromServerError{err} closeErr = err }
// 底层没有接收到server的任何数据,断开该连接,可能原因是在client发出request的同时,server关闭
// 了连接。参见transportReadFromServerError的注释。
if pc.readLimit <= { err = fmt.Errorf(net/http: server response headers exceeded %d bytes; aborted // 传递错误信息给roundTrip并退出loop case rc.ch <- responseAndError{err: err}: rc.callerGone: } } pc.readLimit = maxInt64 effictively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock()
// 判断response是否可写,在使用101 Switching Protocol进行协议升级时需要返回一个可写的resp.body
// 如果使用了101 Switching Protocol,升级完成后就与transport没有关系了(后续使用http2或websocket等)
bodyWritable :
= resp.bodyIsWritable()
// 判断response的body是否为空,如果body为空,则不必读取body内容(HEAD的resp.body没有数据) hasBody :
= rc.req.Method != " && resp.ContentLength != 0 // 如果server关闭连接或client关闭连接或非预期的响应码或使用了协议升级,这几种情况下不能在该连接上继续
// 接收响应,退出readLoop if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { Don't do keep-alive on error if either party requested a close or we get an unexpected informational (1xx) response. StatusCode 100 is already handled above. alive = }
// 此处用于处理body为空或协议升级场景,会尝试将连接放回连接池,对于后者,连接由调用者管理,退出readLoop
if !hasBody || bodyWritable { pc.t.setReqCanceler(rc.req,nil) // 在返回response前将连接放回连接池,快速回收利用。回收连接需要按顺序满足:
// 1.alive 为true
// 2.接收到EOF错误,此时底层连接关闭,该连接不可用
// 3.成功发送request;

// 此处的执行顺序很重要,将连接返回连接池的操作放到最后,即在协议升级的场景,服务端不再

// 发送数据的场景,以及request发送失败的场景下都不会将连接放回连接池,这些情况会导致
// alive为false,readLoop退出并关闭该连接(协议升级后的连接不能关闭)
alive = alive && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) bodyWritable {
// 协议升级之后还是会使用同一条连接,设置closeErr为errCallerOwnsConn,这样在readLoop
// return后不会被pc.close(closeErr)关闭连接 closeErr
= errCallerOwnsConn } // 1:将response成功返回后继续等待下一个response;
// 2:如果roundTrip退出,(此时无法返回给roundTrip response)则退出readLoop。
// 即roundTrip接收完response后退出不会影响readLoop继续运行
responseAndError{res: resp}: } Now that they've read from the unbuffered channel,they're safely out of the select that also waits on this goroutine to die,1)"> we're allowed to exit now if needed (if alive is false) testHookReadLoopBeforeNextRead() continue }
// 下面处理response body存在数据的场景,逻辑与body不存在数据的场景类似 waitForBodyRead :
= make(chan bool,1)">// 初始化body的处理函数,读取完response会返回EOF,这类连接是可重用的 body := &bodyEOFSignal{ body: resp.Body,earlyCloseFn: func() error { waitForBodyRead <- false <-eofc will be closed by deferred call at the end of the function nil },
fn: func(err error) error { iSEOF :
= err == io.EOF waitForBodyRead <- iSEOF iSEOF { <-eofc see comment above eofc declaration } else nil { if cerr := pc.canceled(); cerr != nil { cerr } } err },}
//返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致
// readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中
resp.Body
= body if rc.addedGzip && strings.EqualFold(resp.Header.Get(Content-Encoding"),1)">) { resp.Body = &gzipReader{body: body} resp.Header.Del() resp.Header.Del(Content-Length) resp.ContentLength = - resp.Uncompressed = } // 此处与处理不带resp.body的场景相同 responseAndError{res: resp}: rc.callerGone: } Before looping back to the top of this function and peeking on the bufio.Reader,wait for the caller goroutine to finish reading the response body. (or for cancelation or death) case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req,nil) before pc might return to idle pool alive = alive &&
// 如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。
//
注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(http response.Body)关闭,一个是底层通道(TCP)关闭。 bodyEOF && !pc.sawEOF && tryPutIdleConn(trace)
// 释放阻塞的读操作
bodyEOF { eofc <- {}{} } rc.req.Cancel: alive = pc.t.CancelRequest(rc.req) rc.req.Context().Done(): alive = pc.t.cancelRequest(rc.req,rc.req.Context().Err()) pc.closech: alive = } testHookReadLoopBeforeNextRead() } }

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读