加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang net之http server

发布时间:2020-12-16 09:18:25 所属栏目:大数据 来源:网络整理
导读:golang 版本:1.12.9 简单的HTTP服务器代码: package mainimport ( " net/http " )type TestHandler struct { str string }func (th *TestHandler)ServeHTTP(w http.ResponseWriter,r * http.Request){ w.Write([] byte ( string (th.str+ ,welcome )))}fun

golang 版本:1.12.9

简单的HTTP服务器代码:

package main

import (
    "net/http"
)

type TestHandler struct {
    str string
}

func (th *TestHandler)ServeHTTP(w http.ResponseWriter,r *http.Request){
    w.Write([]byte(string(th.str+,welcome)))
}

func main(){
    http.Handle(/",&TestHandler{Hi,Stranger})
    http.HandleFunc(/testhttp.Request){
        w.Write([]byte())
    })
    http.ListenAndServe(:8000在浏览器输入“http://127.0.0.1:8000”得到输出“Hi,Stranger,welcome”;输入“http://127.0.0.1:8000/test”得到输出“Hi,Tester”

handler的注册

handler的相关方法如下:

func NewServeMux() *ServeMux
func (mux *ServeMux) Handle(pattern string,handler Handler)                               //注册handler
func (mux *ServeMux) HandleFunc(pattern string,handler func(ResponseWriter,*Request))    //注册handler
func (mux *ServeMux) Handler(r *Request) (h Handler,pattern string)                       //在mux.m中根据pattern查找handler
func (mux *ServeMux) ServeHTTP(w ResponseWriter,r *Request)                               //handler的具体实现

http使用handler定义请求的路径以及请求的处理。每个handler都必须实现ServeHTTP方法,该方法将请求分发到不同的handler进行处理,每个handler处理一条请求路径。有两种注册handler的方式:http.Handlehttp.HandleFunc,两种实现本质上是一致的,前者需要明确写出ServeHTTP方法的实现,后者由内置方法实现(见下文)。

Handler的接口定义如下:

// net/http/server.go
type Handler interface { ServeHTTP(ResponseWriter,*Request) }

http.HandleFunc的第二个参数被定义为HandlerFunc,实现了Handler接口。

// net/http/server.go
type HandlerFunc func(ResponseWriter,*Request) // ServeHTTP calls f(w,r). func (f HandlerFunc) ServeHTTP(w ResponseWriter,1)">Request) { f(w,r) }

当http.ListenAndServe的第二个参数为nil,则使用http.Handle和http.HandleFunc方法注册的handler,默认保存在http.DefaultServeMux.m中(注册方法为ServeMux.Handle/ServeMux.HandleFunc)。当http server接收到一个request时,会在serverHandler.ServeHTTP中调用DefaultServeMux.ServeHTTP来处理接收到的request,分为两步:

    • 调用ServeMux.Handler函数,在ServeMux.m中根据pattern遍历查合适的handler
    • 调用handler的ServeHTTP方法

serverHandler.ServeHTTP的源码如下:

// net/http/server.go
func (sh serverHandler) ServeHTTP(rw ResponseWriter,req *Request) {
// 如果有自注册的handler则使用自注册的,否则使用默认的handler处理请求 handler :
= sh.srv.Handler if handler == nil { handler = DefaultServeMux } if req.RequestURI == *" && req.Method == OPTIONS { handler = globalOptionsHandler{} } handler.ServeHTTP(rw,req) }

DefaultServeMux的结构体定义如下:


var DefaultServeMux = &defaultServeMux
var defaultServeMux ServeMux
 net/http/server.go
type ServeMux  {
    mu    sync.RWMutex
    m     map[]muxEntry
    es    []muxEntry  slice of entries sorted from longest to shortest.
    hosts bool        whether any patterns contain hostnames
}

默认的handler的ServeHTTP方法实现如下,主要实现查找handler并处理请求

func (mux *ServeMux) ServeHTTP(w ResponseWriter,1)">Request) {
    if r.RequestURI ==  {
        if r.ProtoAtLeast(1,1) {
            w.Header().Set(Connectionclose)
        }
        w.WriteHeader(StatusBadRequest)
        return
    }
// 根据请求的路径查找注册的handler h,_ :
= mux.Handler(r)
// 调用注册的handler处理请求,对应上面例子的
// http.HandleFunc("/test",r *http.Request){w.Write([]byte("Hi,Tester"))}) h.ServeHTTP(
// 本函数根据请求中的路径找到合适的handler或者重定向(请求路径格式不正确)
func (mux *ServeMux) Handler(r *Request) (h Handler,pattern ) { CONNECT requests are not canonicalized.
// 对CONNECT请求的处理,代理场景 if r.Method == CONNECT If r.URL.Path is /tree and its handler is not registered, the /tree -> /tree/ redirect applies to CONNECT requests but the path canonicalization does not.
// redirectToPathSlash函数主要用于自动检测是否重定向URL并修改重定向URL路径,当注册的URL路径为/tree/,而请求URL路径为/tree,
// redirectToPathSlash函数无法在mux.m中查找注册的handler,则将设请求URL设置为/tree/
if u,ok := mux.redirectToPathSlash(r.URL.Host,r.URL.Path,r.URL); ok { return RedirectHandler(u.String(),StatusMovedPermanently),u.Path } return mux.handler(r.Host,r.URL.Path) } All other requests have any port stripped and path cleaned before passing to mux.handler. host := stripHostPort(r.Host) path := cleanPath(r.URL.Path) 非代理场景重定向的处理,与"CONNECT"逻辑相同 mux.redirectToPathSlash(host,path,r.URL); ok { return RedirectHandler(u.String(),u.Path } // 如果请求路径不等于处理后的路径,如请求路径为"//test/",处理后的路径为"/test/",执行重定向并返回URL路径,重定向
// 通过http.redirectHandler.ServeHTTP函数进行处理,如下:
/*

? ? ? ? < HTTP/1.1 301 Moved Permanently
? ? ? ? < Content-Type: text/html; charset=utf-8
? ? ? ? < Location: /test/
? ? ? ? < Date: Fri,06 Dec 2019 03:35:59 GMT
? ? ? ? < Content-Length: 41
? ? ? ? <
? ? ? ? <a href="/test/">Moved Permanently</a>.

? ? ? */

if path != r.URL.Path {
        _,pattern = mux.handler(host,path)
        url := *r.URL
        url.Path = path
        return RedirectHandler(url.String(),pattern
    }
    // 在mux.m和mux.es中根据host/url.path找到对应的handler
    return mux.handler(host,r.URL.Path)
}
func (rh *redirectHandler) ServeHTTP(Request) {
    Redirect(通常使用http.HandleFunc注册handler,使用DefaultServeMux的方法分发处理请求即可。也可以通过http.NewServeMux()创建一个自定义的serverHandler,并实现Serve HTTP方法。

import (
    )))
}

func main(){
    serverHandler := http.NewServeMux()
    serverHandler.Handle(})
    serverHandler.HandleFunc())
    })

    http.ListenAndServe(调用下面函数进行监听,主要创建监听socket并接收该socket上的连接。通常调用如下接口即可:

func ListenAndServe(addr Server{Addr: addr,Handler: handler}
    return server.ListenAndServe()
}

一个Server结构体表示一个启用监听端口的真实服务

type Server struct {
    Addr    string   TCP address to listen on,":http" if empty
    Handler Handler  handler to invoke,http.DefaultServeMux if nil

     TLSConfig optionally provides a TLS configuration for use
     by ServeTLS and ListenAndServeTLS. Note that this value is
     cloned by ServeTLS and ListenAndServeTLS,so it's not
     possible to modify the configuration with methods like
     tls.Config.SetSessionTicketKeys. To use
     SetSessionTicketKeys,use Server.Serve with a TLS Listener
     instead.
    TLSConfig *tls.Config

     ReadTimeout is the maximum duration for reading the entire
     request,including the body.
    //
     Because ReadTimeout does not let Handlers make per-request
     decisions on each request body's acceptable deadline or
     upload rate,most users will prefer to use
     ReadHeaderTimeout. It is valid to use them both.
    ReadTimeout time.Duration

     ReadHeaderTimeout is the amount of time allowed to read
     request headers. The connection's read deadline is reset
     after reading the headers and the Handler can decide what
     is considered too slow for the body. If ReadHeaderTimeout
     is zero,the value of ReadTimeout is used. If both are
     zero,there is no timeout.
    ReadHeaderTimeout  WriteTimeout is the maximum duration before timing out
     writes of the response. It is reset whenever a new
     request's header is read. Like ReadTimeout,it does not
     let Handlers make decisions on a per-request basis.
    WriteTimeout  IdleTimeout is the maximum amount of time to wait for the
     next request when keep-alives are enabled. If IdleTimeout
     MaxHeaderBytes controls the maximum number of bytes the
     server will read parsing the request header's keys and
     values,including the request line. It does not limit the
     size of the request body.
     If zero,DefaultMaxHeaderBytes is used.
    MaxHeaderBytes int

     TLSNextProto optionally specifies a function to take over
     ownership of the provided TLS connection when an NPN/ALPN
     protocol upgrade has occurred. The map key is the protocol
     name negotiated. The Handler argument should be used to
     handle HTTP requests and will initialize the Request's TLS
     and RemoteAddr if not already set. The connection is
     automatically closed when the function returns.
     If TLSNextProto is not nil,HTTP/2 support is not enabled
     automatically.
    TLSNextProto map[string]func(*Server,1)">tls.Conn,Handler)

     ConnState specifies an optional callback function that is
     called when a client connection changes state. See the
     ConnState type and associated constants for details.
    ConnState func(net.Conn,ConnState)

     ErrorLog specifies an optional logger for errors accepting
     connections,unexpected behavior from handlers,and
     underlying FileSystem errors.
     If nil,logging is done via the log package's standard logger.
    ErrorLog *log.Logger

     BaseContext optionally specifies a function that returns
     the base context for incoming requests on this server.
     The provided Listener is the specific Listener that's
     about to start accepting requests.
     If BaseContext is nil,the default is context.Background().
     If non-nil,it must return a non-nil context.
    BaseContext func(net.Listener) context.Context

     ConnContext optionally specifies a function that modifies
     the context used for a new connection c. The provided ctx
     is derived from the base context and has a ServerContextKey
     value.
    ConnContext func(ctx context.Context,c net.Conn) context.Context

    disableKeepAlives int32      accessed atomically.
    inShutdown        int32      accessed atomically (non-zero means we're in Shutdown)
    nextProtoOnce     sync.Once  guards setupHTTP2_* init
    nextProtoErr      error      result of http2.ConfigureServer if used

    mu         sync.Mutex
    listeners  map[*net.Listener]struct{}
    activeConn map[*conn]struct{}
    doneChan   chan struct{}
    onShutdown []func()
}

?ListenAndServe在创建监听socket后调用Serve等待连接

func (srv *Server) ListenAndServe() error {
// 服务器调用Server.Close或Server.Shutdown关闭连接时会设置shuttingDown为1,表示该服务正在停止,不可提供服务。
// Close会直接关闭底层tcp连接,Shutdown
则会调用服务提供的函数Server.onShutdown平滑关闭。推荐使用Shutdown
if srv.shuttingDown() { return ErrServerClosed } addr := srv.Addr if addr == "" { addr = :http } ln,err := net.Listen(tcpif err != nil { err } srv.Serve(ln) }

ListenAndServeTLS与ListenAndServe类似,只是入参多了证书参数

func (srv *Server) ListenAndServeTLS(certFile,keyFile string) error {
    if srv.shuttingDown() {
        return ErrServerClosed
    }
    addr := srv.Addr
    if addr == "" {
        addr = ":https"
    }

    ln,err := net.Listen("tcp",addr)
    if err != nil {
        return err
    }

    defer ln.Close()

    return srv.ServeTLS(ln,certFile,keyFile)
}

ServeTLS函数中会调用tls.NewListener创建一个tls类型的监听socket,后续会调用tls的Accetp函数接收客户端连接

func (srv *Server) ServeTLS(l net.Listener,keyFile string) error {
    // Setup HTTP/2 before srv.Serve,to initialize srv.TLSConfig
    // before we clone it and create the TLS Listener.
    if err := srv.setupHTTP2_ServeTLS(); err != nil {
        return err
    }

    config := cloneTLSConfig(srv.TLSConfig)
    if !strSliceContains(config.NextProtos,"http/1.1") {
        config.NextProtos = append(config.NextProtos,"http/1.1")
    }

    configHasCert := len(config.Certificates) > 0 || config.GetCertificate != nil
    if !configHasCert || certFile != "" || keyFile != "" {
        var err error
        config.Certificates = make([]tls.Certificate,1)
        config.Certificates[0],err = tls.LoadX509KeyPair(certFile,keyFile)
        if err != nil {
            return err
        }
    }

    tlsListener := tls.NewListener(l,config)
    return srv.Serve(tlsListener)
}
// src/crypto/tls/tls.go
// tls的Accept仅仅在处理Server函数是增加了证书相关的参数

func (l *listener) Accept() (net.Conn,error) { c,err := l.Listener.Accept() if err != nil { return nil,err } return Server(c,l.config),nil }

Serve主要实现如下。通过Accept与客户端创建连接后,通过newConn函数初始化一个HTTP连接,该连接包含HTTP的描述(监听地址,URL等)和一个TCP连接,然后处理来自客户的HTTP请求。

func (srv *Server) Serve(l net.Listener) error {
    ...
    ctx := context.WithValue(baseCtx,ServerContextKey,srv)
    for {
// Accept()返回底层TCP的连接 rw,e :
= l.Accept() if e != nil { select { case <-srv.getDoneChan(): ErrServerClosed default: } if ne,ok := e.(net.Error); ok && ne.Temporary() {
// 处理accept因为网络失败之后的等待时间
if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf(http: Accept error: %v; retrying in %vcontinue } e } if cc := srv.ConnContext; cc != nil { ctx = cc(ctx,rw) if ctx == nil { panic(ConnContext returned nil) } } tempDelay = 0
//构造HTTP连接
c := srv.newConn(rw) c.setState(c.rwc,StateNew) before Serve can return
//在另外的goroutine中处理基于该TCP的HTTP请求,本goroutine可以继续accept TCP连接
go c.serve(ctx) } }

Accept返回的底层的连接结构如下

type Conn interface {
     Read reads data from the connection.
     Read can be made to time out and return an Error with Timeout() == true
     after a fixed time limit; see SetDeadline and SetReadDeadline.
    Read(b []byte) (n int Write writes data to the connection.
     Write can be made to time out and return an Error with Timeout() == true
     after a fixed time limit; see SetDeadline and SetWriteDeadline.
    Write(b [] Close closes the connection.
     Any blocked Read or Write operations will be unblocked and return errors.
    Close() error

     LocalAddr returns the local network address.
    LocalAddr() Addr

     RemoteAddr returns the remote network address.
    RemoteAddr() Addr

     SetDeadline sets the read and write deadlines associated
     with the connection. It is equivalent to calling both
     SetReadDeadline and SetWriteDeadline.
     A deadline is an absolute time after which I/O operations
     fail with a timeout (see type Error) instead of
     blocking. The deadline applies to all future and pending
     I/O,not just the immediately following call to Read or
     Write. After a deadline has been exceeded,the connection
     can be refreshed by setting a deadline in the future.
     An idle timeout can be implemented by repeatedly extending
     the deadline after successful Read or Write calls.
     A zero value for t means I/O operations will not time out.
     Note that if a TCP connection has keep-alive turned on,1)"> which is the default unless overridden by Dialer.KeepAlive
     or ListenConfig.KeepAlive,then a keep-alive failure may
     also return a timeout error. On Unix systems a keep-alive
     failure on I/O can be detected using
     errors.Is(err,syscall.ETIMEDOUT).
    SetDeadline(t .Time) error

     SetReadDeadline sets the deadline for future Read calls
     and any currently-blocked Read call.
     A zero value for t means Read will not time out.
    SetReadDeadline(t  SetWriteDeadline sets the deadline for future Write calls
     and any currently-blocked Write call.
     Even if write times out,it may return n > 0,indicating that
     some of the data was successfully written.
     A zero value for t means Write will not time out.
    SetWriteDeadline(t .Time) error
}

实现如上接口的有tcpsock的TCPConn以及unixsock的UnixConn,通常使用TCPConn

type TCPConn struct {
    conn
}
type UnixConn struct {
    conn
}

newConn生成的HTTP结构体如下,它表示一条基于TCP的HTTP连接,封装了3个重要的数据结构:server表示HTTP server的"server";rwc表示底层连接结构体rwc net.Conn;r用于读取http数据的connReader(从rwc读取数据)。后续的request和response都基于该结构体

type conn struct {
     server is the server on which the connection arrived.
     Immutable; never nil.
    server *Server

     cancelCtx cancels the connection-level context.
    cancelCtx context.CancelFunc

     rwc is the underlying network connection.
     This is never wrapped by other types and is the value given out
     to CloseNotifier callers. It is usually of type *net.TCPConn or
     *tls.Conn.
    rwc net.Conn

     remoteAddr is rwc.RemoteAddr().String(). It is not populated synchronously
     inside the Listener's Accept goroutine,as some implementations block.
     It is populated immediately inside the (*conn).serve goroutine.
     This is the value of a Handler's (*Request).RemoteAddr.
    remoteAddr string

     tlsState is the TLS connection state when using TLS.
     nil means not TLS.
    tlsState *tls.ConnectionState

     werr is set to the first write error to rwc.
     It is set via checkConnErrorWriter{w},where bufw writes.
    werr error

     r is bufr's read source. It's a wrapper around rwc that provides
     io.LimitedReader-style limiting (while reading request headers)
     and functionality to support CloseNotifier. See *connReader docs.
    r *connReader

     bufr reads from r.
    bufr *bufio.Reader

     bufw writes to checkConnErrorWriter{c},which populates werr on error.
    bufw *bufio.Writer

     lastMethod is the method of the most recent request
     on this connection,if any.
    lastMethod 

    curReq atomic.Value  of *response (which has a Request in it)

    curState struct{ atomic uint64 }  packed (unixtime<<8|uint8(ConnState))

     mu guards hijackedv
    mu .Mutex

     hijackedv is whether this connection has been hijacked
     by a Handler with the Hijacker interface.
     It is guarded by mu.
    hijackedv bool
}

connReader中的conn就是上面表示http连接的结构体

type connReader struct {
    conn *conn

    mu      sync.Mutex  guards following
    hasByte 
    byteBuf [1]byte
    cond    *.Cond
    inRead  
    aborted bool   set true before conn.rwc deadline is set to past
    remain  int64  bytes remaining
}

在下面的server函数中处理请求并返回响应

func (c *conn) serve(ctx context.Context) {
    c.remoteAddr = c.rwc.RemoteAddr().String()
    ctx = context.WithValue(ctx,LocalAddrContextKey,c.rwc.LocalAddr())
    defer func() {
        if err := recover(); err != nil && err != ErrAbortHandler {
            const size = 64 << 10
            buf := make([]false)]
            c.server.logf(http: panic serving %v: %vn%sif !c.hijacked() {
            c.close()
            c.setState(c.rwc,StateClosed)
        }
    }()

// 处理ServeTLS accept的连接
if tlsConn,ok := c.rwc.(*tls.Conn); ok { if d := c.server.ReadTimeout; d != // 设置TCP的读超时时间 c.rwc.SetReadDeadline(time.Now().Add(d)) } if d := c.server.WriteTimeout; d != // 设置TCP的写超时时间 c.rwc.SetWriteDeadline(time.Now().Add(d)) }
// tls协商并判断协商结果 if err := tlsConn.Handshake(); err != If the handshake failed due to the client not speaking TLS,assume they're speaking plaintext HTTP and write a 400 response on the TLS conn's underlying net.Conn. if re,ok := err.(tls.RecordHeaderError); ok && re.Conn != nil && tlsRecordHeaderLooksLikeHTTP(re.RecordHeader) { io.WriteString(re.Conn,HTTP/1.0 400 Bad RequestrnrnClient sent an HTTP request to an HTTPS server.n) re.Conn.Close() } c.server.logf(http: TLS handshake error from %s: %v } c.tlsState = new(tls.ConnectionState) *c.tlsState = tlsConn.ConnectionState()
// 用于判断是否使用TLS的NPN扩展协商出非http/1.1和http/1.0的上层协议,如果存在则使用server.TLSNextProto处理请求
if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) { if fn := c.server.TLSNextProto[proto]; fn != nil { h := initNPNRequest{ctx,tlsConn,serverHandler{c.server}} fn(c.server,h) } } } 下面处理HTTP/1.x的请求 ctx,cancelCtx := context.WithCancel(ctx) c.cancelCtx = cancelCtx defer cancelCtx()   
   // 为c.bufr创建read源,使用sync.pool提高存取效率 c.r
= &connReader{conn: c}
// read buf长度默认为4096,创建ioReader为c.r的bufio.Reader。用于读取HTTP的request c.bufr
= newBufioReader(c.r)
   // c.bufw默认长度为4096,4<<10=4096,用于发送response c.bufw
= newBufioWriterSize(checkConnErrorWriter{c},1)">4<<) // 循环处理HTTP请求 {
     // 处理请求并返回封装好的响应 w,err :
= c.readRequest(ctx)
// 判断是否有读取过数据,如果读取过数据则设置TCP状态为active
if c.r.remain != c.server.initialReadLimitSize() { If we read any bytes off the wire,we're active. c.setState(c.rwc,StateActive) }
// 处理http请求错误
const errorHeaders = rnContent-Type: text/plain; charset=utf-8rnConnection: closernrn" switchcase err == errTooLarge: Their HTTP client may or may not be able to read this if we're responding to them and hanging up while they're still writing their request. Undefined behavior. const publicErr = 431 Request Header Fields Too Large fmt.Fprintf(c.rwc,1)">HTTP/1.1 "+publicErr+errorHeaders+publicErr) c.closeWriteAndWait() return // 直接return会断开底层TCP连接(GC?) case isUnsupportedTEError(err): Respond as per RFC 7230 Section 3.3.1 which says,1)"> A server that receives a request message with a transfer coding it does not understand SHOULD respond with 501 (Unimplemented). code := StatusNotImplemented We purposefully aren't echoing back the transfer-encoding's value,1)"> so as to mitigate the risk of cross side scripting by an attacker. fmt.Fprintf(c.rwc,1)">HTTP/1.1 %d %s%sUnsupported transfer encodingreturn isCommonNetReadError(err): return don't reply : publicErr := 400 Bad Request" if v,1)"> err.(badRequestError); ok { publicErr = publicErr + : " + (v) } fmt.Fprintf(c.rwc,1)">publicErr) } } Expect 100 Continue support req := w.req
// 如果http首部包含"100-continue"请求
req.expectsContinue() {
// "100-continue"的首部要求http1.1版本以上,且http.body长度不为0
if req.ProtoAtLeast(1) && req.ContentLength != { Wrap the Body reader with one that replies on the connection req.Body = &expectContinueReader{readCloser: req.Body,resp: w} }
// 非"100-continue"但首部包含"Expect"字段的请求为非法请求 }
else if req.Header.get(Expect") != { w.sendExpectationFailed() } // curReq保存了当前的response,当前代码中主要用于在读失败后调用response中的closeNotifyCh传递信号,此时连接断开 c.curReq.Store(w) // 判断是否有后续的数据,req.Body在http.readTransfer函数中设置为http.body类型,registerOnHitEOF注册的就是
// 遇到EOF时执行的函数http.body.onHitEOF
requestBodyRemains(req.Body) { registerOnHitEOF(req.Body,w.conn.r.startBackgroundRead) } // 如果没有后续的数据,调用下面函数在新的goroutine中阻塞等待数据的到来,通知finishRequest w.conn.r.startBackgroundRead() } HTTP cannot have multiple simultaneous active requests.[*] Until the server replies to this request,it can't read another,1)"> so we might as well run the handler in this goroutine. [*] Not strictly true: HTTP pipelining. We could let them all process in parallel even if their responses need to be serialized. But we're not going to implement HTTP pipelining because it was never deployed in the wild and the answer is HTTP/2.
// 通过请求找到匹配的handler,然后处理请求并发送响应 serverHandler{c.server}.ServeHTTP(w,w.req) w.cancelCtx() c.hijacked() { }
// 该函数中会结束HTTP请求,发送response w.finishRequest()
// 判断是否需要重用底层TCP连接,即是否退出本函数的for循环,推出for循环将断开连接
w.shouldReuseConnection() {
// 不可重用底层连接时,如果请求数据过大或设置提前取消读取数据,则调用closeWriteAndWait平滑关闭TCP连接
if w.requestBodyLimitHit || w.closedRequestBodyEarly() { c.closeWriteAndWait() } // 重用连接,设置底层状态为idle c.setState(c.rwc,StateIdle) c.curReq.Store((*response)(nil))
// 如果没有通过SetKeepAlivesEnabled设置HTTP keepalive或底层连接已经通过如Server.Close关闭,则直接退出
w.conn.server.doKeepAlives() { We're in shutdown mode. We might've replied to the user without "Connection: close" and they might think they can send another } if d := c.server.idleTimeout(); d != // 如果设置了idle状态超时时间,则调用SetReadDeadline设置底层连接deadline,并调用bufr.Peek等待请求 c.rwc.SetReadDeadline(time.Now().Add(d)) if _,err := c.bufr.Peek(4); err != nil { } } c.rwc.SetReadDeadline(time.Time{}) } }

readRequest函数处理http请求

func (c *conn) readRequest(ctx context.Context) (w *response,err error) {
     c.hijacked() {
        return nil,ErrHijacked
    }

    var (
        wholeReqDeadline time.Time  or zero if none
        hdrDeadline       or zero if none
    )
    t0 := .Now()
// 设置读取HTTP的超时时间
if d := c.server.readHeaderTimeout(); d != { hdrDeadline = t0.Add(d) }
// 设置读取整个HTTP的超时时间
{ wholeReqDeadline = // 通过SetReadDeadline设置TCP读超时时间 c.rwc.SetReadDeadline(hdrDeadline) // 通过defer设置TCP写超时时间,本函数主要处理读请求,在本函数处理完request之后再设置写超时时间 defer func() { c.rwc.SetWriteDeadline(.Now().Add(d)) }() } // 设置读取请求的最大字节数,为DefaultMaxHeaderBytes+4096=1052672,用于防止超大报文攻击 c.r.setReadLimit(c.server.initialReadLimitSize())
// 处理老设备的client
if c.lastMethod == POST RFC 7230 section 3.5 Message Parsing Robustness tolerance for old buggy clients. peek,_ := c.bufr.Peek(4) ReadRequest will get err below c.bufr.Discard(numLeadingCRorLF(peek)) }
// 从bufr读取request,并返回结构体格式的请求 req,1)"> readRequest(c.bufr,keepHostHeader)
nil {
// 如果读取的报文超过限制,则返回错误
c.r.hitReadLimit() { return nil,errTooLarge } return nil,err } // 判断是否是go服务所支持的HTTP/1.x的请求 http1ServerSupportsRequest(req) { return nil,badRequestError(unsupported protocol version) } c.lastMethod = req.Method c.r.setInfiniteReadLimit() hosts,haveHost := req.Header[Host] isH2Upgrade := req.isH2Upgrade()
// 判断是否需要Host首部字段
1) && (!haveHost || len(hosts) == 0) && !isH2Upgrade && req.Method != { return nil,1)">missing required Host header) }
// 多个Host首部字段
if len(hosts) > too many Host headers // 非法Host首部字段值 if len(hosts) == 1 && !httpguts.ValidHostHeader(hosts[]) { return nil,1)">malformed Host header // 判断首部字段值是否有非法字符 for k,vv := range req.Header { httpguts.ValidHeaderFieldName(k) { return nil,1)">invalid header namefor _,v := range vv { httpguts.ValidHeaderFieldValue(v) { return nil,1)">invalid header value) } } }
// 响应报文中不包含Host字段 delete(req.Header,1)">) ctx,1)"> context.WithCancel(ctx) req.ctx
= ctx req.RemoteAddr = c.remoteAddr req.TLS = c.tlsState if body,ok := req.Body.(*body); ok { body.doEarlyClose = true } 判断是否超过请求的最大值 hdrDeadline.Equal(wholeReqDeadline) { c.rwc.SetReadDeadline(wholeReqDeadline) } w = &response{ conn: c,cancelCtx: cancelCtx,req: req,reqBody: req.Body,handlerHeader: make(Header),contentLength: -make(chan bool,1)">),1)"> We populate these ahead of time so we're not reading from req.Header after their Handler starts and maybe mutates it (Issue 14940) wants10KeepAlive: req.wantsHttp10KeepAlive(),wantsClose: req.wantsClose(),} isH2Upgrade { w.closeAfterReply = }
// w.cw.res中保存了response的信息,而response中又保存了底层连接conn,后续将通过w.cw.res.conn写数据
w.cw.res = w
// 创建2048字节的写bufio,用于发送response
w.w = newBufioWriterSize(&.cw,bufferBeforeChunkingSize) return 读取HTTP请求,并将其结构化为http.Request

func readRequest(b *bufio.Reader,deleteHostHeader bool) (req *Request,err error) {
    // 封装为textproto.Reader,该结构体实现了读取HTTP的相关方法
tp :
= newTextprotoReader(b)
// 初始化一个Request结构体,该函数后续工作就是填充该变量并返回 req
= new(Request) First line: GET /index.html HTTP/1.0 var s string
// ReadLine会调用<textproto.(*Reader).ReadLine->textproto.(*Reader).readLineSlice->bufio.(*Reader).ReadLine->
// bufio.(*Reader).ReadSlic->bufio.(*Reader).fill->http.(*connReader).Read>读取HTTP的请求并填充b.buf,并返回以"n"作为
// 分隔符的首行字符串
if s,err = tp.ReadLine(); err != nil { return nil,err }
// putTextprotoReader函数使用sync.pool来保存textproto.Reader变量,通过重用内存来提升在大量HTTP请求下执行效率。
// 对应函数首部的newTextprotoReader
defer func() { putTextprotoReader(tp)
if err == io.EOF { err = io.ErrUnexpectedEOF } }() var ok bool
// 解析请求方法,请求URL,请求协议
req.Method,req.RequestURI,req.Proto,ok = parseRequestLine(s) ok { return nil,&badStringError{malformed HTTP request // 判断方法是否包含非法字符 validMethod(req.Method) { return nil,1)">invalid method // 获取请求路径,如HTTP请求为"http://127.0.0.1:8000/test"时,rawurl为"/test" rawurl := req.RequestURI
// 判断HTTP协议版本有效性,通常为支持HTTP/1.x
if req.ProtoMajor,req.ProtoMinor,ok = ParseHTTPVersion(req.Proto); !malformed HTTP version CONNECT requests are used two different ways,and neither uses a full URL: The standard use is to tunnel HTTPS through an HTTP proxy. It looks like "CONNECT www.google.com:443 HTTP/1.1",and the parameter is just the authority section of a URL. This information should go in req.URL.Host. The net/rpc package also uses CONNECT,but there the parameter is a path that starts with a slash. It can be parsed with the regular URL parser,1)"> and the path will end up in req.URL.Path,where it needs to be in order for RPC to work.
// 处理代理场景,使用"CONNECT"与代理建立连接时会使用完整的URL(带host)
justAuthority := req.Method == " && !strings.HasPrefix(rawurl,1)">) justAuthority { rawurl = http://" + rawurl } if req.URL,err = url.ParseRequestURI(rawurl); err != justAuthority { Strip the bogus "http:" back off. req.URL.Scheme = 解析request首部的key:value mimeHeader,err := tp.ReadMIMEHeader() Header(mimeHeader) RFC 7230,section 5.3: Must treat GET /index.html HTTP/1.1 Host: www.google.com and GET http://www.google.com/index.html HTTP/1.1 Host: doesntmatter the same. In the second case,any Host line is ignored. req.Host = req.URL.Host
// 如果是上面注释中的第一种需要从req.Header中获取"Host"字段
if req.Host == { req.Host = req.Header.get(// "Host"字段仅存在于request中,在接收到之后需要删除首部的Host字段,更多参见该变量注释 deleteHostHeader { delete(req.Header,1)">) } //处理"Cache-Control"首部 fixPragmaCacheControl(req.Header) // 判断是否是长连接,如果是,则保持连接,反之则断开并删除"Connection"首部 req.Close = shouldClose(req.ProtoMajor,req.Header,1)">) // 解析首部字段并填充req内容 err = readTransfer(req,b) // 当HTTP1.1服务尝试解析HTTP2的消息时使用"PRI"方法 req.isH2Upgrade() { Because it's neither chunked,nor declared: req.ContentLength = -1 We want to give handlers a chance to hijack the connection,but we need to prevent the Server from dealing with the connection further if it's not hijacked. Set Close to ensure that: req.Close = } return req,nil }
func shouldClose(major,minor int,header Header,removeCloseHeader bool)  {
    // HTTP/1.x以下不支持"connection"指定长连接
if major < { return } conv := header[]
// 如果首部包含"Connection: close"则断开连接 hasClose :
= httpguts.HeaderValuesContainsToken(conv,1)">)
// 使用HTTP/1.0时,如果包含"Connection: close"或不包含"Connection: keep-alive",则使用短连接;
// HTTP/1.1中不指定"Connection",默认使用长连接 if major == 1 && minor == { return hasClose || !httpguts.HeaderValuesContainsToken(conv,1)">keep-alive// 如果使用非长连接,且需要删除首部中的Connection字段。在经过proxy或gateway时必须移除Connection首部字段 if hasClose && removeCloseHeader { header.Del() } return hasClose }
func readTransfer(msg interface{},1)">bufio.Reader) (err error) {
    t := &transferReader{RequestMethod: GET}

     Unify input
    isResponse := 
    switch rr := msg.(type) {
// 消息为响应时的赋值
case *Response: t.Header = rr.Header t.StatusCode = rr.StatusCode t.ProtoMajor = rr.ProtoMajor t.ProtoMinor = rr.ProtoMinor
// 响应中不需要Connection首部字段,下面函数最后一个参数设置为true,删除该首部字段 t.Close
= shouldClose(t.ProtoMajor,t.ProtoMinor,t.Header,1)">) isResponse = true if rr.Request != nil { t.RequestMethod = rr.Request.Method }
// 消息为请求时的赋值
Request: t.Header = rr.Header t.RequestMethod = rr.Method t.ProtoMajor = rr.ProtoMinor Transfer semantics for Requests are exactly like those for Responses with status code 200,responding to a GET method t.StatusCode = 200 t.Close = rr.Close default: panic(unexpected type) } Default to HTTP/1.1 if t.ProtoMajor == 0 && t.ProtoMinor == { t.ProtoMajor,t.ProtoMinor = 处理"Transfer-Encoding"首部 err = t.fixTransferEncoding() nil { return err } // 处理"Content-Length"首部,注意此处返回的是真实的消息载体长度 realLength,1)"> fixLength(isResponse,t.StatusCode,t.RequestMethod,t.TransferEncoding) nil { return err }
// 如果该消息为响应且对应的请求方法为HEAD,如果响应首部包含Content-Length字段,则将此作为响应的ContentLength的值,表示server
// 可以接收到的数据的最大长度,由于该响应没有有效载体,此时不能使用fixLength返回的真实长度0
if isResponse && t.RequestMethod == HEADif n,err := parseContentLength(t.Header.get(Content-Length")); err != nil { return err } { t.ContentLength = n } } { t.ContentLength = realLength } 处理Trailer首部字段,主要进行有消息校验 t.Trailer,err = fixTrailer(t.Header,1)"> nil { return err } If there is no Content-Length or chunked Transfer-Encoding on a *Response and the status is not 1xx,204 or 304,then the body is unbounded. See RFC 7230,section 3.3.
// 含body但不是chunked且不包含length字段的响应称为unbounded(无法衡量长度的消息)消息,根据RFC 7230会被关闭
switch msg.(type) { Response: if realLength == -1 && !chunked(t.TransferEncoding) && bodyAllowedForStatus(t.StatusCode) { Unbounded body. t.Close = Prepare body reader. ContentLength < 0 means chunked encoding or close connection when finished,since multipart is not supported yet
// 给t.Body赋值
switch {
// chunked 场景处理
chunked(t.TransferEncoding):
// 如果请求为HEAD或响应状态码为1xx,204 or 304,则消息不包含有效载体
if noResponseBodyExpected(t.RequestMethod) || !bodyAllowedForStatus(t.StatusCode) { t.Body = NoBody } // 下面会创建chunkedReader t.Body = &body{src: internal.NewChunkedReader(r),hdr: msg,r: r,closing: t.Close} } case realLength == : t.Body = NoBody
// 非chunked且包含有效载体(对应Content-Length),创建limitReader
case realLength > : t.Body = &body{src: io.LimitReader(r,realLength),closing: t.Close} default: realLength < 0,i.e. "Content-Length" not mentioned in header
// 此处对于消息有效载体unbounded场景,断开底层连接
t.Close { Close semantics (i.e. HTTP/1.0) t.Body = &body{src: r,closing: t.Close} } Persistent connection (i.e. HTTP/1.1) 好像走不到该分支。。。 t.Body = NoBody } } 为请求/响应结构体赋值并通过指针返回 switch rr := msg.(type) { Request: rr.Body = t.Body rr.ContentLength = t.ContentLength rr.TransferEncoding = t.TransferEncoding rr.Close = t.Close rr.Trailer = t.Trailer Response: rr.Body = t.Trailer } return nil }
// 1.13.3版本的本函数描述有误,下面代码来自最新master分支
func (t *transferReader) fixTransferEncoding() error {
// 本函数主要处理"Transfer-Encoding"首部,如果不存在,则直接退出 raw,present :
= t.Header[Transfer-Encoding] present { return nil } delete(t.Header,1)">) Issue 12785; ignore Transfer-Encoding on HTTP/1.0 requests.
// HTTP/1.0不处理此首部
if !t.protoAtLeast() { return nil }
// "Transfer-Encoding"首部字段使用逗号分割 encodings :
= strings.Split(raw[0],1)">) te := make([]string,len(encodings)) When adding new encodings,please maintain the invariant: if chunked encoding is present,it must always come last and it must be applied only once. See RFC 7230 Section 3.3.1 Transfer-Encoding.
// 循环处理各个传输编码,目前仅实现了"chunked"
for i,encoding := range encodings { encoding = strings.ToLower(strings.TrimSpace(encoding)) if encoding == identity "identity" should not be mixed with other transfer-encodings/compressions because it means "no compression,no transformation". if len(encodings) != { return &badStringError{`" when present must be the only transfer encoding`,strings.Join(encodings,1)">)} } "identity" is not recorded. break } switch { case encoding == chunked: "chunked" MUST ALWAYS be the last encoding as per the loop invariant. That is: Invalid: [chunked,gzip] Valid: [gzip,chunked] if i+1 != len(encodings) { return &badStringError{chunked must be applied only once,as the last encoding Supported otherwise. isGzipTransferEncoding(encoding): Supported default: return &unsupportedTEError{fmt.Sprintf(unsupported transfer encoding: %q0 : len(te)+] te[len(te)-1] = encoding } if len(te) > RFC 7230 3.3.2 says "A sender MUST NOT send a Content-Length header field in any message that contains a Transfer-Encoding header field." // but also: "If a message is received with both a Transfer-Encoding and a Content-Length header field,the Transfer-Encoding overrides the Content-Length. Such a message might indicate an attempt to perform request smuggling (Section 9.5) or response splitting (Section 9.4) and ought to be handled as an error. A sender MUST remove the received Content-Length field prior to forwarding such a message downstream." Reportedly,these appear in the wild.
// "Transfer-Encoding"就是为了解决"Content-Length"不存在才出现了,因此当存在"Transfer-Encoding"时无需处理"Content-Length",
// 此处删除"Content-Length"首部,不在fixLength函数中处理
        delete(t.Header,1)">)
        t.TransferEncoding = te
        return nil
    }

    return nil
}
// 本函数处理Content-Length首部,并返回真实的消息载体长度
func fixLength(isResponse ) (int64,error) { isRequest := !isResponse contentLens := header[] Hardening against HTTP request smuggling if len(contentLens) > Per RFC 7230 Section 3.3.2,prevent multiple Content-Length headers if they differ in value. If there are dups of the value,remove the dups. See Issue 16490.
// 下面按照RFC 7230的建议进行处理,如果一个Content-Length包含多个不同的value,则认为该消息无效
first := strings.TrimSpace(contentLens[]) :] { if first != strings.TrimSpace(ct) { return 0,1)">fmt.Errorf(http: message cannot contain multiple Content-Length headers; got %q 如果一个Content-Length包含多个相同的value,则仅保留一个 header.Del() header.Add(] } 处理HEAD请求 noResponseBodyExpected(requestMethod) { For HTTP requests,as part of hardening against request smuggling (RFC 7230),don't allow a Content-Length header for methods which don't permit bodies. As an exception,allow exactly one Content-Length header if its value is "0".
// 当HEAD请求中的Content-Length为0时允许存在该字段
if isRequest && len(contentLens) > 0 && !(len(contentLens) == 1 && contentLens[0] == 0) { return http: method cannot contain a Content-Length; got %q // 处理状态码为1xx的响应,不包含消息体 if status/100 == { return // 处理状态码为204和304的响应,不包含消息体 switch status { case 204,1)">304: return 包含Transfer-Encoding时无法衡量数据长度,以Transfer-Encoding为准,设置返回长度为-1,直接返回 chunked(te) { return -// 获取Content-Length字段值 if len(contentLens) == { cl = strings.TrimSpace(contentLens[]) }
// 对Content-Length字段的值进行有效性验证,如果有效则返回该值的整型,无效返回错误
if cl != { n,1)"> parseContentLength(cl) nil { return - // 数值为空,删除该首部字段 header.Del()
// 请求中没有Content-Length且没有Transfer-Encoding字段的请求被认为没有有效载体
isRequest { RFC 7230 neither explicitly permits nor forbids an entity-body on a GET request so we permit one if declared,but we default to 0 here (not -1 below) if there's no mention of a body. Likewise,all other request methods are assumed to have no body if neither Transfer-Encoding chunked nor a Content-Length are set. return Body-EOF logic based on other methods (like closing,or chunked coding)
// 消息为响应,该场景后续会在readTransfer被close处理
return -
func (cr *connReader) startBackgroundRead() {
    cr.lock()
    defer cr.unlock()
// 表示该连接正在被读取
cr.inRead { panic(invalid concurrent Body.Read call // 表示该连接上是否还有数据 cr.hasByte { return } cr.inRead = true
// 设置底层连接deadline为1<<64 -1
cr.conn.rwc.SetReadDeadline(.Time{})
// 在新的goroutine中等待数据 go cr.backgroundRead() }
func (cr *connReader) backgroundRead() {
// 阻塞等待读取一个字节的数 n,1)"> cr.conn.rwc.Read(cr.byteBuf[:]) cr.lock()
// 如果存在数据则设置cr.hasByte为true,byteBuf容量为1
if n == { cr.hasByte = We were past the end of the previous request's body already (since we wouldn't be in a background read otherwise),so this is a pipelined HTTP request. Prior to Go 1.11 we used to send on the CloseNotify channel and cancel the context here,1)"> but the behavior was documented as only "may",and we only did that because that's how CloseNotify accidentally behaved in very early Go releases prior to context support. Once we added context support,people used a Handler's Request.Context() and passed it along. Having that context cancel on pipelined HTTP requests caused problems. Fortunately,almost nothing uses HTTP/1.x pipelining. Unfortunately,apt-get does,or sometimes does. New Go 1.11 behavior: don't fire CloseNotify or cancel contexts on pipelined requests. Shouldn't affect people,but fixes cases like Issue 23921. This does mean that a client closing their TCP connection after sending a pipelined request won't cancel the context,but we'll catch that on any write failure (in checkConnErrorWriter.Write). If the server never writes,yes,there are still contrived server & client behaviors where this fails to ever cancel the context,but that's kinda why HTTP/1.x pipelining died anyway. } ne.Timeout() { Ignore this error. It's the expected error from another goroutine calling abortPendingRead. } nil { cr.handleReadError(err) } cr.aborted = cr.inRead = cr.unlock()
// 当有数据时,通知cr.cond.Wait解锁 cr.cond.Broadcast() }
func (response) finishRequest() {
    .handlerDone.setTrue()
    // wroteHeader表示是否已经将响应首部写入,没有则写入
    .wroteHeader {
        .WriteHeader(StatusOK)
    }
    // 此处调用w.cw.write(checkConnErrorWriter) -> c.rwc.write发送数据,即调用底层连接的write将buf中的数据发送出去
    .Flush()
// 将w.w重置并放入sync.pool中,待后续重用 putBufioWriter(
)

// 主要构造chunked的结束符:"0rn","rn",通过cw.chunking判断是否是chunked编码
.cw.close()
// 发送bufw缓存的数据
.conn.bufw.Flush() // 用于等待处理未读取完的数据,与connReader.backgroundRead中的cr.cond.Broadcast()对应 .conn.r.abortPendingRead() Close the body (regardless of w.closeAfterReply) so we can re-use its bufio.Reader later safely. .reqBody.Close() if w.req.MultipartForm !=.req.MultipartForm.RemoveAll() } }
func (w *response) shouldReuseConnection()   // 表示是否需要在响应之后关闭底层连接。requestTooLarge,isH2Upgrade或包含首部字段"Connection:close"时置位
    .closeAfterReply {
         The request or something set while executing the
         handler indicated we shouldn't reuse this
         connection.
        return 
    }
    // 写入数据与"content-length"不匹配,为避免不同步,不重用连接
    w.req.Method != " && w.contentLength != -1 && w.bodyAllowed() && w.contentLength != .written {
         Did not write enough. Avoid getting out of sync.
        return  There was some error writing to the underlying connection
     during the request,so don't re-use this conn.
// 底层连接出现错误,不可重用
w.conn.werr != nil { return } // 判断是否在读取完数据前执行关闭 .closedRequestBodyEarly() { return } return }
// closeWrite flushes any outstanding data and sends a FIN packet (if
// client is connected via TCP),signalling that we're done. We then
// pause for a bit,hoping the client processes it before any
// subsequent RST.
//
// See https://golang.org/issue/3595
func (c *conn) closeWriteAndWait() {
// 在关闭写之前将缓冲区中的数据发送出去
c.finalFlush()
if tcp,ok := c.rwc.(closeWriter); ok {
// 执行tcpsock.go中的TCPConn.CloseWrite,调用SHUT_WR关闭写
tcp.CloseWrite()
}
time.Sleep(rstAvoidanceDelay)
}
func (c *conn) finalFlush() {
// 本函数中如果c.bufr或c.bufw不为空,都会重置并重用这部分内存
if c.bufr != Steal the bufio.Reader (~4KB worth of memory) and its associated reader for a future connection. putBufioReader(c.bufr) c.bufr = nil } if c.bufw !=// 将缓存区中的数据全部通过底层发送出去
// respose写数据调用为c.bufw.wr.Write -> checkConnErrorWriter.write -> c.rwc.write,最终通过底层write发送数据
c.bufw.Flush() Steal the bufio.Writer (~4KB worth of memory) and its associated writer for a future connection. putBufioWriter(c.bufw) c.bufw = nil } }

http.transport

?参见详解transport

NetPoll

 参见详解golang net之netpoll

?

参考:

https://golang.org/pkg/net/http/

https://lanre.wtf/blog/2017/07/24/roundtripper-go/

https://lanre.wtf/blog/2017/04/03/http-in-go/

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读