剝開比原看代碼07:比原節點收到「請求區塊數據」的信息後如何應答?

做者:freewindreact

比原項目倉庫:git

Github地址:https://github.com/Bytom/bytomgithub

Gitee地址:https://gitee.com/BytomBlockc...數組

在上一篇,咱們知道了比原是如何把「請求區塊數據」的信息BlockRequestMessage發送給peer節點的,那麼本文研究的重點就是,當peer節點收到了這個信息,它將如何應答?app

那麼這個問題若是細分的話,也能夠分爲三個小問題:函數

  1. 比原節點是如何收到對方發過來的信息的?
  2. 收到BlockRequestMessage後,將會給對方發送什麼樣的信息?
  3. 這個信息是如何發送出去的?

咱們先從第一個小問題開始。性能

比原節點是如何接收對方發過來的信息的?

若是咱們在代碼中搜索BlockRequestMessage,會發現只有在ProtocolReactor.Receive方法中針對該信息進行了應答。那麼問題的關鍵就是,比原是如何接收對方發過來的信息,而且把它轉交給ProtocolReactor.Receive的。區塊鏈

若是咱們對前一篇《比原是如何把請求區塊數據的信息發出去的》有印象的話,會記得比原在發送信息時,最後會把信息寫入到MConnection.bufWriter中;與之相應的,MConnection還有一個bufReader,用於讀取數據,它也是與net.Conn綁定在一塊兒的:atom

p2p/connection.go#L114-L118code

func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
    mconn := &MConnection{
        conn:        conn,
        bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
        bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),

(其中minReadBufferSize的值爲常量1024

因此,要讀取對方發來的信息,必定會讀取bufReader。通過簡單的搜索,咱們發現,它也是在MConnection.Start中啓動的:

p2p/connection.go#L152-L159

func (c *MConnection) OnStart() error {
    // ...
    go c.sendRoutine()
    go c.recvRoutine()
    // ...
}

其中的c.recvRoutine()就是咱們本次所關注的。它上面的c.sendRoutine是用來發送的,是前一篇文章中咱們關注的重點。

繼續c.recvRoutine()

p2p/connection.go#L403-L502

func (c *MConnection) recvRoutine() {
    // ...
    for {
        c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)

        // ...

        pktType := wire.ReadByte(c.bufReader, &n, &err)
        c.recvMonitor.Update(int(n))
        // ...

        switch pktType {
        // ...
        case packetTypeMsg:
            pkt, n, err := msgPacket{}, int(0), error(nil)
            wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
            c.recvMonitor.Update(int(n))
            // ...
            channel, ok := c.channelsIdx[pkt.ChannelID]
            // ...
            msgBytes, err := channel.recvMsgPacket(pkt)
            // ...
            if msgBytes != nil {
                // ...
                c.onReceive(pkt.ChannelID, msgBytes)
            }
            // ...
        }
    }
    // ...
}

通過簡化之後,這個方法分紅了三塊內容:

  1. 第一塊就限制接收速率,以防止惡意結點忽然發送大量數據把節點撐死。跟發送同樣,它的限制是500K/s
  2. 第二塊是從c.bufReader中讀取出下一個數據包的類型。它的值目前有三個,兩個跟心跳有關:packetTypePingpacketTypePong,另外一個表示是正常的信息數據類型packetTypeMsg,也是咱們須要關注的
  3. 第三塊就是繼續從c.bufReader中讀取出完整的數據包,而後根據它的ChannelID找到相應的channel去處理它。ChannelID有兩個值,分別是BlockchainChannelPexChannel,咱們目前只須要關注前者便可,它對應的reactor是ProtocolReactor。當最後調用c.onReceive(pkt.ChannelID, msgBytes)時,讀取的二進制數據msgBytes就會被ProtocolReactor.Receive處理

咱們的重點是看第三塊內容。首先是channel.recvMsgPacket(pkt),即通道是怎麼從packet包裏讀取到相應的二進制數據的呢?

p2p/connection.go#L667-L682

func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
    // ...
    ch.recving = append(ch.recving, packet.Bytes...)
    if packet.EOF == byte(0x01) {
        msgBytes := ch.recving
        // ...
        ch.recving = ch.recving[:0]
        return msgBytes, nil
    }
    return nil, nil
}

這個方法我去掉了一些錯誤檢查和關於性能方面的註釋,有興趣的同窗能夠點接上方的源代碼查看,這裏就忽略了。

這段代碼主要是利用了一個叫recving的通道,把packet中持有的字節數組加到它後面,而後再判斷該packet是否表明整個信息結束了,若是是的話,則把ch.recving的內容完整返回,供調用者處理;不然的話,返回一個nil,表示還沒拿完,暫時處理不了。在前一篇文章中關於發送數據的地方能夠與這裏對應,只不過發送方要麻煩的多,須要三個通道sendQueuesendingsend才能實現,這邊接收方就簡單了。

而後回到前面的方法MConnection.recvRoutine,咱們繼續看最後的c.onReceive調用。這個onReceive其實是一個由別人賦值給該channel的一個函數,它位於MConnection建立的地方:

p2p/peer.go#L292-L310

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}

邏輯也比較簡單,就是當前面的c.onReceive(pkt.ChannelID, msgBytes)調用時,它會根據傳入的chID找到相應的Reactor,而後執行其Receive方法。對於本文來講,就會進入到ProtocolReactor.Receive

那咱們繼續看ProtocolReactor.Receive:

netsync/protocol_reactor.go#L179-L247

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
    _, msg, err := DecodeMessage(msgBytes)
    // ...
    switch msg := msg.(type) {
    case *BlockRequestMessage:
        // ...
}

其中的DecodeMessage(...)就是把傳入的二進制數據反序列化成一個BlockchainMessage對象,該對象是一個沒有任何內容的interface,它有多種實現類型。咱們在後面繼續對該對象進行判斷,若是它是BlockRequestMessage類型的信息,咱們就會繼續作相應的處理。處理的代碼我在這裏暫時省略了,由於它是屬於下一個小問題的,咱們先不考慮。

好像不知不覺咱們就把第一個小問題的後半部分差很少搞清楚了。那麼前半部分是什麼?咱們在前面說,讀取bufReader的代碼的起點是在MConnection.Start中,那麼前半部分就是:比原從啓動開始中,是在什麼狀況下怎樣一步步走到MConnection.Start的呢?

好在前半部分的問題咱們在前一篇文章《比原是如何把請求區塊數據的信息發出去的》中進行了專門的討論,這裏就不講了,有須要的話能夠再過去看一下(能夠先看最後「總結」那一小節)。

下面咱們進入第二個小問題:

收到BlockRequestMessage後,將會給對方發送什麼樣的信息?

這裏就是接着前面的ProtocolReactor.Receive繼續向下講了。首先咱們再貼一下它的較完整的代碼:

netsync/protocol_reactor.go#L179-L247

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
    _, msg, err := DecodeMessage(msgBytes)
    // ...

    switch msg := msg.(type) {
    case *BlockRequestMessage:
        var block *types.Block
        var err error
        if msg.Height != 0 {
            block, err = pr.chain.GetBlockByHeight(msg.Height)
        } else {
            block, err = pr.chain.GetBlockByHash(msg.GetHash())
        }
        // ...
        response, err := NewBlockResponseMessage(block)
        // ...
        src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
    // ...
}

能夠看到,邏輯仍是比較簡單的,即根據對方發過來的BlockRequestMessage中指定的height或者hash信息,在本地的區塊鏈數據中找到相應的block,組成BlockResponseMessage發過去就好了。

其中chain.GetBlockByHeight(...)chain.GetBlockByHash(...)若是詳細說明的話,須要深入理解區塊鏈數據在比原節點中是如何保存的,咱們在本文先不講,等到後面專門研究。

在這裏,我以爲咱們只須要知道咱們會查詢區塊數據而且構造出一個BlockResponseMessage,再經過BlockchainChannel這個通道發送出去就能夠了。

最後一句代碼中調用了src.TrySend方法,它是把信息向對方peer發送過去。(其中的src就是指的對方peer)

那麼,它究竟是怎麼發送出去的呢?下面咱們進入最後一個小問題:

這個BlockResponseMessage信息是如何發送出去的?

咱們先看看peer.TrySend代碼:

p2p/peer.go#L242-L247

func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if !p.IsRunning() {
        return false
    }
    return p.mconn.TrySend(chID, msg)
}

它在內部將會調用MConnection.TrySend方法,其中chIDBlockchainChannel,也就是它對應的Reactor是ProtocolReactor

再接着就是咱們熟悉的MConnection.TrySend,因爲它在前一篇文章中進行了全面的講解,在本文就不提了,若是須要能夠過去翻看一下。

那麼今天的問題就算是解決啦。

到這裏,咱們總算可以完整的理解清楚,當咱們向一個比原節點請求「區塊數據」,咱們這邊須要怎麼作,對方節點又須要怎麼作了。

相關文章
相關標籤/搜索