原文:Uber RPC 框架TChannel源碼分析——多路複用的實現git
UBER的RPC框架TChannel有一個閃亮點————多路複用。對於多路複用是如何實現一直都很好奇,因此抽了點時間看了TChannel多路複用的實現源碼,並整理成這篇文章。文章主要從客戶端【發起請求】到服務端【響應請求】一條完整請求來看多路複用整個生命週期的實現。
客戶端調用咱們把這個過程分紅4個步驟:
1. 出站握手 2. 複用連接 3. 消息交換 4. 有序寫入——發起請求
github.com/uber/tchannel-go/preinit_connection.go #35 func (ch *Channel) outboundHandshake(ctx context.Context, c net.Conn, outboundHP string, events connectionEvents) (_ *Connection, err error) { ...... msg := &initReq{initMessage: ch.getInitMessage(ctx, 1)} if err := ch.writeMessage(c, msg); err != nil { return nil, err } ...... res := &initRes{} id, err := ch.readMessage(c, res) if err != nil { return nil, err } ...... return ch.newConnection(c, 1 /* initialID */, outboundHP, remotePeer, remotePeerAddress, events), nil }
在開始請求前,TChannel有一次握手,此次握手不是TCP/IP的三次握手,是爲了確認服務端可以正常響應。 若是服務端可以正常響應,則這條TCP連接將會被複用。
func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP string, remotePeer PeerInfo, remotePeerAddress peerAddressComponents, events connectionEvents) *Connection { ...... connID := _nextConnID.Inc() ...... c := &Connection{ channelConnectionCommon: ch.channelConnectionCommon, connID: connID, conn: conn, opts: opts, state: connectionActive, sendCh: make(chan *Frame, opts.SendBufferSize), ...... inbound: newMessageExchangeSet(log, messageExchangeSetInbound), outbound: newMessageExchangeSet(log, messageExchangeSetOutbound), ...... } ...... // Connections are activated as soon as they are created. c.callOnActive() go c.readFrames(connID) go c.writeFrames(connID) return c }
當握手成功,這條連接隨後會被放入Peer,以備其餘請求使用。同時會啓動2個協程,「readFrames」 用於讀取服務端的響應,「writeFrames」把數據寫入TCP連接裏面,關於這2個協程的做用下面會詳細介紹。
github.com/uber/tchannel-go/peer.go #361 func (p *Peer) getActiveConnLocked() (*Connection, bool) { allConns := len(p.inboundConnections) + len(p.outboundConnections) if allConns == 0 { return nil, false } // We cycle through the connection list, starting at a random point // to avoid always choosing the same connection. startOffset := peerRng.Intn(allConns) for i := 0; i < allConns; i++ { connIndex := (i + startOffset) % allConns if conn := p.getConn(connIndex); conn.IsActive() { return conn, true } } return nil, false }
複用連接是多路複用很關鍵的一步,和HTTP的複用不一樣,HTTP連接須要響應成功後才能被複用,而多路複用連接只要被建立了就能被複用。
github.com/uber/tchannel-go/mex.go #306 func (mexset *messageExchangeSet) newExchange(ctx context.Context, framePool FramePool, msgType messageType, msgID uint32, bufferSize int) (*messageExchange, error) { ...... mex := &messageExchange{ msgType: msgType, msgID: msgID, ctx: ctx, //請求會等待Frame的寫入 recvCh: make(chan *Frame, bufferSize), errCh: newErrNotifier(), mexset: mexset, framePool: framePool, } mexset.Lock() //保存messageExchange addErr := mexset.addExchange(mex) mexset.Unlock() ...... mexset.onAdded() ...... return mex, nil }
在客戶端發起多個請求的時候,因爲只有一個TCP連接,如何知道哪一個響應是對應哪一個請求?爲了可以正確響應,TChannel使用了MessageExchange,一個請求對應一個MessageExchange。客戶端會以stream id 爲下標索引,保存全部的MessageExchange。當有一個請求時,它會阻塞在MessageExchange.recvCh, 響應回來會根據響應的stream id獲取對應的MessageExchange, 並把幀放到 MessageExchange.recvCh 從而實現無序響應。
github.com/uber/tchannel-go/reqres.go #139 func (w *reqResWriter) flushFragment(fragment *writableFragment) error { ...... frame := fragment.frame.(*Frame) ...... select { ...... case w.conn.sendCh <- frame: return nil } }
github.com/uber/tchannel-go/connection.go #706 func (c *Connection) writeFrames(_ uint32) { for { select { case f := <-c.sendCh: ...... err := f.WriteOut(c.conn) ...... } } }
在多路複用中,只有一條TCP連接,爲了不客戶端同時寫入連接裏,TChannel先把幀寫入隊列「sendCh」,再使用一個消費者獲取隊列數據,而後有序寫入連接裏面。
github.com/uber/tchannel-go/frame.go #107 // A Frame is a header and payload type Frame struct { buffer []byte // full buffer, including payload and header headerBuffer []byte // slice referencing just the header // The header for the frame Header FrameHeader // The payload for the frame Payload []byte } // FrameHeader is the header for a frame, containing the MessageType and size type FrameHeader struct { // The size of the frame including the header size uint16 // The type of message represented by the frame messageType messageType // Left empty reserved1 byte // The id of the message represented by the frame ID uint32 //指Stream ID // Left empty reserved [8]byte }
幀被分爲2部分,一部分是Header Frame(只有16字節);另外一部分是Data Frame。這2部分數據按照必定格式標準轉成二進制數據進行傳輸。
服務端響應咱們把這個過程分紅3個步驟:
1. 入站握手 2. 讀取請求數據 3. 有序寫入——響應結果
github.com/uber/tchannel-go/preinit_connection.go #69 func (ch *Channel) inboundHandshake(ctx context.Context, c net.Conn, events connectionEvents) (_ *Connection, err error) { id := uint32(math.MaxUint32) ...... req := &initReq{} id, err = ch.readMessage(c, req) if err != nil { return nil, err } ...... res := &initRes{initMessage: ch.getInitMessage(ctx, id)} if err := ch.writeMessage(c, res); err != nil { return nil, err } return ch.newConnection(c, 0 /* initialID */, "" /* outboundHP */, remotePeer, remotePeerAddress, events), nil }
入站握手是對客戶端出站握手的響應,當握手成功,服務端這邊也會調用newConnection,啓動「readFrames」 和 「writeFrames」協程,等待客戶端請求。
github.com/uber/tchannel-go/connection.go #615 func (c *Connection) readFrames(_ uint32) { headerBuf := make([]byte, FrameHeaderSize) ...... for { ...... //先讀頭部 if _, err := io.ReadFull(c.conn, headerBuf); err != nil { handleErr(err) return } frame := c.opts.FramePool.Get() if err := frame.ReadBody(headerBuf, c.conn); err != nil { handleErr(err) c.opts.FramePool.Release(frame) return } //handle frame ...... } }
在服務端會監聽握手成功的連接,若是客戶端發送了請求,就會讀取連接裏面的數據。讀取分2步:
Header Frame 的長度固定爲16字節,這裏面有stream Id 和 Data Frame的長度
從Header Frame獲取到 Data Frame的長度後,根據長度從連接讀取指定的字節長度,就獲取到正確的Data Frame。
服務端的有序寫入和客戶端的有序寫入是同樣的功能,只是所處的角色不同,這裏再也不重複。
客戶端獲取響應結果咱們把這個過程分紅2個步驟:
1. 讀取響應結果 2. 找到MessageExchange響應
客戶端獲取響應結果和服務端的讀取請求數據也是相同的功能,這裏再也不重複。
github.com/uber/tchannel-go/mex.go #429 func (mexset *messageExchangeSet) forwardPeerFrame(frame *Frame) error { ...... mexset.RLock() mex := mexset.exchanges[frame.Header.ID] mexset.RUnlock() ...... //把幀交給MessageExchange.recvCh if err := mex.forwardPeerFrame(frame); err != nil { ...... return err } return nil }
在客戶端發起調用時介紹過,它會阻塞在MessageExchange.recvCh,當響應回來時會根據stream Id(上面的frame.Header.ID) 找到對應的MessageExchange,並把frame放入recvCh,完成響應。這一步就體如今上面的代碼。
至此UBER的RPC框架TChannel————多路複用介紹完,感謝UBER團隊的貢獻,讓我收益不少。