数据帧
HTTP2通信的最小单位是数据帧,每一个帧都包含两部分:帧头和Payload。不同数据流的帧可以交错发送(同一个数据流的帧必须顺序发送),然后再根据每个帧头的数据流标识符重新组装。
由于Payload中为有效数据,故仅对帧头进行分析描述。
帧头
帧头总长度为9个字节,并包含四个部分,分别是:
payload的长度,占用三个字节。
数据帧类型,占用一个字节。
数据帧标识符,占用一个字节。
数据流ID,占用四个字节。
用图表示如下:
数据帧的格式和各部分的含义已经清楚了, 那么我们看看代码中怎么读取一个帧头:
func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {_, err := io.ReadFull(r, buf[:http2frameHeaderLen])if err != nil {return http2FrameHeader{}, err}return http2FrameHeader{Length: (uint32(buf[0])在上面的代码中http2frameHeaderLen是一个常量,其值为9。
从io.Reader中读取9个字节后,将前三个字节和后四个字节均转为uint32的类型,从而得到Payload长度和数据流ID。另外需要理解的是帧头的前三个字节和后四个字节存储格式为大端(大小端笔者就不在这里解释了,请尚不了解的读者自行百度)。
数据帧类型
根据http://http2.github.io/http2-spec/#rfc.section.11.2描述,数据帧类型总共有10个。在go源码中均有体现:
const (http2FrameData http2FrameType = 0x0http2FrameHeaders http2FrameType = 0x1http2FramePriority http2FrameType = 0x2http2FrameRSTStream http2FrameType = 0x3http2FrameSettings http2FrameType = 0x4http2FramePushPromise http2FrameType = 0x5http2FramePing http2FrameType = 0x6http2FrameGoAway http2FrameType = 0x7http2FrameWindowUpdate http2FrameType = 0x8http2FrameContinuation http2FrameType = 0x9)登录后复制
http2FrameData:主要用于发送请求body和接收响应的数据帧。
http2FrameHeaders:主要用于发送请求header和接收响应header的数据帧。
http2FrameSettings:主要用于client和server交流设置相关的数据帧。
http2FrameWindowUpdate:主要用于流控制的数据帧。
其他数据帧类型因为本文不涉及,故不做描述。
数据帧标识符
由于数据帧标识符种类较多,笔者在这里仅介绍其中部分标识符,先看源码:
const (// Data Framehttp2FlagDataEndStream http2Flags = 0x1// Headers Framehttp2FlagHeadersEndStream http2Flags = 0x1// Settings Framehttp2FlagSettingsAck http2Flags = 0x1// 此处省略定义其他数据帧标识符的代码)登录后复制
http2FlagDataEndStream:在前篇中提到,调用(*http2ClientConn).newStream方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream的作用。
当client收到有响应body的响应时(HEAD请求无响应body,301,302等响应也无响应body),一直读到http2FrameData数据帧的标识符为http2FlagDataEndStream则意味着本次请求结束可以关闭当前数据流。
http2FlagHeadersEndStream:如果读到的http2FrameHeaders数据帧有此标识符也意味着本次请求结束。
http2FlagSettingsAck:该标示符意味着对方确认收到http2FrameSettings数据帧。
流控制器
流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go中HTTP2通过http2flow结构体进行流控制:
type http2flow struct {// n is the number of DATA bytes we're allowed to send.// A flow is kept both on a conn and a per-stream.n int32// conn points to the shared connection-level flow that is// shared by all streams on that conn. It is nil for the flow// that's on the conn directly.conn *http2flow}登录后复制
字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。
(*http2flow).available
此方法返回当前流控制可发送的最大字节数:
func (f *http2flow) available() int32 {n := f.nif f.conn != nil && f.conn.n登录后复制
如果f.conn为nil则意味着此控制器的控制级别为连接,那么可发送的最大字节数就是f.n。
如果f.conn不为nil则意味着此控制器的控制级别为数据流,且当前数据流可发送的最大字节数不能超过当前连接可发送的最大字节数。
(*http2flow).take
此方法用于消耗当前流控制器的可发送字节数:
func (f *http2flow) take(n int32) {if n > f.available() {panic("internal error: took too much")}f.n -= nif f.conn != nil {f.conn.n -= n}}登录后复制
通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n。
(*http2flow).add
有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:
func (f *http2flow) add(n int32) bool {sum := f.n + nif (sum > n) == (f.n > 0) {f.n = sumreturn true}return false}登录后复制
上面的代码唯一需要注意的地方是,当sum超过int32正数最大值(2^31-1)时会返回false。
回顾:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通过(*http2flow).add初始化可发送数据窗口大小。
有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。
(*http2ClientConn).readLoop
前篇分析(*http2Transport).newClientConn时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop开始。
func (cc *http2ClientConn) readLoop() {rl := &http2clientConnReadLoop{cc: cc}defer rl.cleanup()cc.readerErr = rl.run()if ce, ok := cc.readerErr.(http2ConnectionError); ok {cc.wmu.Lock()cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)cc.wmu.Unlock()}}登录后复制
由上可知,readLoop的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run方法里。
func (rl *http2clientConnReadLoop) run() error {cc := rl.ccrl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUsegotReply := false // ever saw a HEADERS replygotSettings := falsefor {f, err := cc.fr.ReadFrame()// 此处省略代码maybeIdle := false // whether frame might transition us to idleswitch f := f.(type) {case *http2MetaHeadersFrame:err = rl.processHeaders(f)maybeIdle = truegotReply = truecase *http2DataFrame:err = rl.processData(f)maybeIdle = truecase *http2GoAwayFrame:err = rl.processGoAway(f)maybeIdle = truecase *http2RSTStreamFrame:err = rl.processResetStream(f)maybeIdle = truecase *http2SettingsFrame:err = rl.processSettings(f)case *http2PushPromiseFrame:err = rl.processPushPromise(f)case *http2WindowUpdateFrame:err = rl.processWindowUpdate(f)case *http2PingFrame:err = rl.processPing(f)default:cc.logf("Transport: unhandled response frame type %T", f)}if err != nil {if http2VerboseLogs {cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)}return err}if rl.closeWhenIdle && gotReply && maybeIdle {cc.closeIfIdle()}}}登录后复制
由上可知,(*http2clientConnReadLoop).run的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。
cc.fr.ReadFrame()会根据前面介绍的数据帧格式读出数据帧。
前篇中提到使用了一个支持h2协议的图片进行分析,本篇继续复用该图片对(*http2clientConnReadLoop).run方法进行debug。
收到http2FrameSettings数据帧
读循环会最先读到http2FrameSettings数据帧。读到该数据帧后会调用(*http2clientConnReadLoop).processSettings方法。(*http2clientConnReadLoop).processSettings主要包含3个逻辑。
1、判断是否是http2FrameSettings的ack信息,如果是直接返回,否则继续后面的步骤。
if f.IsAck() {if cc.wantSettingsAck { cc.wantSettingsAck = falsereturn nil }return http2ConnectionError(http2ErrCodeProtocol)}登录后复制
2、处理不同http2FrameSettings的数据帧,并根据server传递的信息,修改maxConcurrentStreams等的值。
err := f.ForeachSetting(func(s http2Setting) error {switch s.ID {case http2SettingMaxFrameSize: cc.maxFrameSize = s.Valcase http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Valcase http2SettingMaxHeaderListSize: cc.peerMaxHeaderListSize = uint64(s.Val)case http2SettingInitialWindowSize:if s.Val > math.MaxInt32 {return http2ConnectionError(http2ErrCodeFlowControl) } delta := int32(s.Val) - int32(cc.initialWindowSize)for _, cs := range cc.streams { cs.flow.add(delta) } cc.cond.Broadcast() cc.initialWindowSize = s.Valdefault:// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. cc.vlogf("Unhandled Setting: %v", s) }return nil})登录后复制
当收到ID为http2SettingInitialWindowSize的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)为s.Val。
3、发送http2FrameSettings的ack信息给server。
cc.wmu.Lock()defer cc.wmu.Unlock()cc.fr.WriteSettingsAck()cc.bw.Flush()return cc.werr登录后复制
收到http2WindowUpdateFrame数据帧
在笔者debug的过程中,处理完http2FrameSettings数据帧后,紧接着就收到了http2WindowUpdateFrame数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate方法:
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {cc := rl.cccs := cc.streamByID(f.StreamID, false)if f.StreamID != 0 && cs == nil {return nil}cc.mu.Lock()defer cc.mu.Unlock()fl := &cc.flowif cs != nil {fl = &cs.flow}if !fl.add(int32(f.Increment)) {return http2ConnectionError(http2ErrCodeFlowControl)}cc.cond.Broadcast()return nil}登录后复制
上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果http2WindowUpdateFrame帧中的StreamID为0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。
注意:在debug的过程,收到http2WindowUpdateFrame数据帧后,又收到一次http2FrameSettings,且该数据帧标识符为http2FlagSettingsAck。
笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn方法,也向server发送了http2FrameSettings数据帧和http2WindowUpdateFrame数据帧。
另外,在处理http2FrameSettings和http2WindowUpdateFrame过程中,均出现了cc.cond.Broadcast()调用,该调用主要用于唤醒因为以下两种情况而Wait的请求:
因当前连接处理的数据流已经达到maxConcurrentStreams的上限(详见前篇中(*http2ClientConn).awaitOpenSlotForRequest方法分析)。
因发送数据流已达可发送数据窗口上限而等待可发送数据窗口更新的请求(后续会介绍)。
收到http2MetaHeadersFrame数据帧
收到此数据帧意味着某一个请求已经开始接收响应数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processHeaders:
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {cc := rl.cccs := cc.streamByID(f.StreamID, false)// 此处省略代码res, err := rl.handleResponse(cs, f)if err != nil {// 此处省略代码cs.resc首先我们先看cs.resc
回顾:前篇(*http2ClientConn).roundTrip方法的第7点和本部分关联起来就可以形成一个完整的请求链。
接下来我们对rl.handleResponse方法展开分析。
(*http2clientConnReadLoop).handleResponse
(*http2clientConnReadLoop).handleResponse的主要作用是构建一个Response变量,下面对该函数的关键步骤进行描述。
1、构建一个Response变量。
header := make(Header)res := &Response{ Proto: "HTTP/2.0", ProtoMajor: 2, Header: header, StatusCode: statusCode, Status: status + " " + StatusText(statusCode),}登录后复制
2、构建header(本篇不对header进行展开分析)。
for _, hf := range f.RegularFields() { key := CanonicalHeaderKey(hf.Name)if key == "Trailer" { t := res.Trailerif t == nil { t = make(Header) res.Trailer = t } http2foreachHeaderElement(hf.Value, func(v string) { t[CanonicalHeaderKey(v)] = nil }) } else { header[key] = append(header[key], hf.Value) }}登录后复制
3、处理响应body的ContentLength。
streamEnded := f.StreamEnded()isHead := cs.req.Method == "HEAD"if !streamEnded || isHead { res.ContentLength = -1if clens := res.Header["Content-Length"]; len(clens) == 1 {if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { res.ContentLength = clen64 } else {// TODO: care? unlike http/1, it won't mess up our framing, so it's// more safe smuggling-wise to ignore. } } else if len(clens) > 1 {// TODO: care? unlike http/1, it won't mess up our framing, so it's// more safe smuggling-wise to ignore. }}登录后复制
由上可知,当前数据流没有结束或者是HEAD请求才读取ContentLength。如果header中的ContentLength不合法则res.ContentLength的值为 -1。
4、构建res.Body。
cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}cs.bytesRemain = res.ContentLengthres.Body = http2transportResponseBody{cs}go cs.awaitRequestCancel(cs.req)if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") res.Header.Del("Content-Length") res.ContentLength = -1 res.Body = &http2gzipReader{body: res.Body} res.Uncompressed = true}登录后复制
根据Content-Encoding的编码方式,会构建两种不同的Body:
非gzip编码时,构造的res.Body类型为http2transportResponseBody。
gzip编码时,构造的res.Body类型为http2gzipReader。
收到http2DataFrame数据帧
收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData。
因为server无法及时知道数据流在client端的状态,所以server可能会向client中一个已经不存在的数据流发送数据:
cc := rl.cccs := cc.streamByID(f.StreamID, f.StreamEnded())data := f.Data()if cs == nil { cc.mu.Lock() neverSent := cc.nextStreamID cc.mu.Unlock()// 此处省略代码if f.Length > 0 { cc.mu.Lock() cc.inflow.add(int32(f.Length)) cc.mu.Unlock() cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(f.Length)) cc.bw.Flush() cc.wmu.Unlock() }return nil}登录后复制
接收到的数据帧在client没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length,并且通过http2FrameWindowUpdate数据帧告知server将当前连接的可写窗口大小增加f.Length。
如果client有对应的数据流且f.Length大于0:
1、如果是head请求结束当前数据流并返回。
if cs.req.Method == "HEAD" && len(data) > 0 { cc.logf("protocol error: received DATA on a HEAD request") rl.endStreamError(cs, http2StreamError{ StreamID: f.StreamID, Code: http2ErrCodeProtocol, })return nil}登录后复制
2、检查当前数据流能否处理f.Length长度的数据。
cc.mu.Lock()if cs.inflow.available() >= int32(f.Length) { cs.inflow.take(int32(f.Length))} else { cc.mu.Unlock()return http2ConnectionError(http2ErrCodeFlowControl)}登录后复制
由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take减小当前数据流可接受窗口大小。
3、当前数据流被重置或者被关闭即cs.didReset为true时又或者数据帧有填充数据时需要调整流控制窗口。
var refund intif pad := int(f.Length) - len(data); pad > 0 { refund += pad}// Return len(data) now if the stream is already closed,// since data will never be read.didReset := cs.didResetif didReset { refund += len(data)}if refund > 0 { cc.inflow.add(int32(refund)) cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(refund))if !didReset { cs.inflow.add(int32(refund)) cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) } cc.bw.Flush() cc.wmu.Unlock()}cc.mu.Unlock()登录后复制
如果数据帧有填充数据则计算需要返还的填充数据长度。
如果数据流无效该数据帧的长度需要全部返还。
最后,根据计算的refund增加当前连接或者当前数据流的可接受窗口大小,并且同时告知server增加当前连接或者当前数据流的可写窗口大小。
4、数据长度大于0且数据流正常则将数据写入数据流缓冲区。
if len(data) > 0 && !didReset {if _, err := cs.bufPipe.Write(data); err != nil { rl.endStreamError(cs, err)return err }}登录后复制
回顾:前面的(*http2clientConnReadLoop).handleResponse方法中有这样一行代码res.Body = http2transportResponseBody{cs},所以在业务开发时能够通过Response读取到数据流中的缓冲数据。
(http2transportResponseBody).Read
在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。
因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。
1、读取响应数据后计算当前连接需要增加的可接受窗口大小。
cc.mu.Lock()defer cc.mu.Unlock()var connAdd, streamAdd int32// Check the conn-level first, before the stream-level.if v := cc.inflow.available(); v如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()。
回顾:http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame数据帧告知server当前连接可发送窗口大小增加http2transportDefaultConnFlow。
2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。
if err == nil { // No need to refresh if the stream is over or failed.// Consider any buffered body data (read from the conn but not// consumed by the client) when computing flow control for this// stream. v := int(cs.inflow.available()) + cs.bufPipe.Len()if v如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v。
回顾:http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。
连接刚建立时,发送http2FrameSettings数据帧,告知server每个数据流的可发送窗口大小为http2transportDefaultStreamFlow。
在newStream时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow。
3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame数据帧告知server。
if connAdd != 0 || streamAdd != 0 { cc.wmu.Lock()defer cc.wmu.Unlock()if connAdd != 0 { cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd)) }if streamAdd != 0 { cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd)) } cc.bw.Flush()}登录后复制
以上就是server向client发送数据的流控制逻辑。
(*http2clientStream).writeRequestBody
前篇中(*http2ClientConn).roundTrip未对(*http2clientStream).writeRequestBody进行分析,下面我们看看该方法的源码:
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {cc := cs.ccsentEnd := false // whether we sent the final DATA frame w/ END_STREAM// 此处省略代码req := cs.reqhasTrailers := req.Trailer != nilremainLen := http2actualContentLength(req)hasContentLen := remainLen != -1var sawEOF boolfor !sawEOF {n, err := body.Read(buf[:len(buf)-1])// 此处省略代码remain := buf[:n]for len(remain) > 0 && err == nil {var allowed int32allowed, err = cs.awaitFlowControl(len(remain))switch {case err == http2errStopReqBodyWrite:return errcase err == http2errStopReqBodyWriteAndCancel:cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)return errcase err != nil:return err}cc.wmu.Lock()data := remain[:allowed]remain = remain[allowed:]sentEnd = sawEOF && len(remain) == 0 && !hasTrailerserr = cc.fr.WriteData(cs.ID, sentEnd, data)if err == nil {err = cc.bw.Flush()}cc.wmu.Unlock()}if err != nil {return err}}// 此处省略代码return err}登录后复制
上面的逻辑可简单总结为:不停的读取请求body然后将读取的内容通过cc.fr.WriteData转为http2FrameData数据帧发送给server,直到请求body读完为止。其中和流控制有关的方法是awaitFlowControl,下面我们对该方法进行分析。
(*http2clientStream).awaitFlowControl
此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {cc := cs.cccc.mu.Lock()defer cc.mu.Unlock()for {if cc.closed {return 0, http2errClientConnClosed}if cs.stopReqBody != nil {return 0, cs.stopReqBody}if err := cs.checkResetOrDone(); err != nil {return 0, err}if a := cs.flow.available(); a > 0 {take := aif int(take) > maxBytes {take = int32(maxBytes) // can't truncate int; take is int32}if take > int32(cc.maxFrameSize) {take = int32(cc.maxFrameSize)}cs.flow.take(take)return take, nil}cc.cond.Wait()}}登录后复制
根据源码可以知道,数据流被关闭或者停止发送请求body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:
当前数据流可写窗口剩余可写数据大于0,则计算可写字节数,并将当前数据流可写窗口大小消耗take。
当前数据流可写窗口剩余可写数据小于等于0,则会一直等待直到被唤醒并进入下一次检查。
上面的第二种情况在收到http2WindowUpdateFrame数据帧这一节中提到过。
server读取当前数据流的数据后会向client对应数据流发送http2WindowUpdateFrame数据帧,client收到该数据帧后会增大对应数据流可写窗口,并执行cc.cond.Broadcast()唤醒因发送数据已达流控制上限而等待的数据流继续发送数据。
以上就是client向server发送数据的流控制逻辑。
总结
帧头长度为9个字节,并包含四个部分:Payload的长度、帧类型、帧标识符和数据流ID。
流控制可分为两个步骤:
初始时,通过http2FrameSettings数据帧和http2WindowUpdateFrame数据帧告知对方当前连接读写窗口大小以及连接中数据流读写窗口大小。
在读写数据过程中,通过发送http2WindowUpdateFrame数据帧控制另一端的写窗口大小。
以上就是Go发起HTTP2.0请求流程分析(中篇)—数据帧&流控制的详细内容,更多请关注【创想鸟】其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。
发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2375613.html