做者:freewindnode
比原項目倉庫:react
Github地址:https://github.com/Bytom/bytomgit
Gitee地址:https://gitee.com/BytomBlockchain/bytomgithub
在前一篇中,咱們說到,當比原向其它節點請求區塊數據時,BlockKeeper
會發送一個BlockRequestMessage
把須要的區塊height
告訴對方,並把該信息對應的二進制數據放入ProtocolReactor
對應的sendQueue
通道中,等待發送。而具體的發送細節,因爲邏輯比較複雜,因此在前一篇中並未詳解,放到本篇中。json
因爲sendQueue
是一個通道,數據放進去後,究竟是由誰在什麼狀況下取走併發送,BlockKeeper
這邊是不知道的。通過咱們在代碼中搜索,發現只有一個類型會直接監視sendQueue
中的數據,它就是前文出現的MConnection
。MConnection
的對象在它的OnStart
方法中,會監視sendQueue
中的數據,而後,等發現數據時,會將之取走並放入一個叫sending
的通道里。緩存
事情變得有點複雜了:併發
MConnection
對應了一個與peer的鏈接,而比原節點之間創建鏈接的狀況又有多種:好比主動鏈接別的節點,或者別的節點主動連上我sending
以後,咱們還須要知道又是誰在什麼狀況下會監視sending
,取走它裏面的數據sending
中的數據被取走後,又是如何被髮送到其它節點的呢?仍是像之前同樣,遇到複雜的問題,咱們先經過「相互獨立,徹底窮盡」的原則,把它分解成一個個小問題,而後依次解決。atom
那麼首先咱們須要弄清楚的是:spa
MConnection
的對象並調用其OnStart
方法?(從而咱們知道sendQueue
中的數據是如何被監視的).net
通過分析,咱們發現MConnection
的啓動,只出如今一個地方,即Peer
的OnStart
方法中。那麼就這個問題就變成了:比原在什麼狀況下,會建立Peer
的對象並調用其OnStart
方法?
再通過一番折騰,終於肯定,在比原中,在下列4種狀況Peer.OnStart
方法最終會被調用:
addrbook.json
中保存的節點的時候PEXReactor
,並使用它本身的協議與當前鏈接上的節點進行通訊的時候Switch.Connect2Switches
方法中(可忽略)第4種狀況咱們徹底忽略。第3種狀況中,因爲PEXReactor
會使用相似於BitTorrent的文件分享協議與其它節點分享數據,邏輯比較獨立,算是一種輔助做用,咱們也暫不考慮。這樣咱們就只須要分析前兩種狀況了。
MConnection.OnStart
方法的?首先咱們快速走到SyncManager.Start
方法:
func main() { cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir())) cmd.Execute() }
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error { n := node.NewNode(config) if _, err := n.Start(); err != nil { // ... }
func (n *Node) OnStart() error { // ... n.syncManager.Start() // ... }
func (sm *SyncManager) Start() { go sm.netStart() // ... }
而後咱們將進入netStart()
方法。在這個方法中,比原將主動鏈接其它節點:
func (sm *SyncManager) netStart() error { // ... if sm.config.P2P.Seeds != "" { // dial out seeds := strings.Split(sm.config.P2P.Seeds, ",") if err := sm.DialSeeds(seeds); err != nil { return err } } return nil }
這裏出現的sm.config.P2P.Seeds
,對應的就是本地數據目錄中config.toml
中的p2p.seeds
中的種子結點。
接着經過sm.DialSeeds
去主動鏈接每一個種子:
func (sm *SyncManager) DialSeeds(seeds []string) error { return sm.sw.DialSeeds(sm.addrBook, seeds) }
func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { // ... for i := 0; i < len(perm)/2; i++ { j := perm[i] sw.dialSeed(netAddrs[j]) } // ... }
func (sw *Switch) dialSeed(addr *NetAddress) { peer, err := sw.DialPeerWithAddress(addr, false) // ... }
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { // ... peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) // ... err = sw.AddPeer(peer) // ... }
先是經過newOutboundPeerWithConfig
建立了peer
,而後把它加入到sw
(即Switch
對象)中。
func (sw *Switch) AddPeer(peer *Peer) error { // ... // Start peer if sw.IsRunning() { if err := sw.startInitPeer(peer); err != nil { return err } } // ... }
在sw.startInitPeer
中,將會調用peer.Start
:
func (sw *Switch) startInitPeer(peer *Peer) error { peer.Start() // ... }
而peer.Start
對應了Peer.OnStart
,最後就是:
func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err }
能夠看到,在這裏調用了mconn.Start
,終於找到了。總結一下就是:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
那麼,第一種主動鏈接別的節點的狀況就到這裏分析完了。下面是第二種狀況:
MConnection.OnStart
方法這一步的?比原節點啓動後,會監聽本地的p2p端口,等待別的節點鏈接上來。那麼這個流程又是什麼樣的呢?
因爲比原節點的啓動流程在目前的文章中已經屢次出現,這裏就不貼了,咱們直接從Switch.OnStart
開始(它是在SyncManager
啓動的時候啓動的):
func (sw *Switch) OnStart() error { // ... for _, peer := range sw.peers.List() { sw.startInitPeer(peer) } // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } // ... }
這個方法通過省略之後,還剩兩塊代碼,一塊是startInitPeer(...)
,一塊是sw.listenerRoutine(listener)
。
若是你剛纔在讀前一節時留意了,就會發現,startInitPeer(...)
方法立刻就會調用Peer.Start
。然而在這裏須要說明的是,通過個人分析,發現這塊代碼實際上沒有起到任何做用,由於在當前這個時刻,sw.peers
老是空的,它裏面尚未來得及被其它的代碼添加進peer。因此我以爲它能夠刪掉,以避免誤導讀者。(提了一個issue,參見#902)
第二塊代碼,listenerRoutine
,若是你還有印象的話,它就是用來監聽本地p2p端口的,在前面「比原是如何監聽p2p端口的」一文中有詳細的講解。
咱們今天仍是須要再挖掘一下它,看看它究竟是怎麼走到MConnection.OnStart
的:
func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() // ... err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) // ... } }
這裏的l
就是監聽本地p2p端口的Listener。經過一個for
循環,拿到鏈接到該端口的節點的鏈接,生成新peer。
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { // ... peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) // ... if err = sw.AddPeer(peer); err != nil { // ... } // ... }
生成新的peer以後,調用了Switch
的AddPeer
方法。到了這裏,就跟前一節同樣了,在AddPeer
中將調用sw.startInitPeer(peer)
,而後調用peer.Start()
,最後調用了MConnection.OnStart()
。因爲代碼如出一轍,就不貼出來了。
總結一下,就是:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
那麼,第二種狀況咱們也分析完了。
不過到目前爲止,咱們只解決了此次問題中的第一個小問題,即:咱們終於知道了比原代碼會在什麼狀況來啓動一個MConnection
,從而監視sendQueue
通道,把要發送的信息數據,轉到了sending
通道中。
那麼,咱們進入下一個小問題:
sending
以後,誰又會來取走它們呢?通過分析以後,發現通道sendQueue
和sending
都屬於類型Channel
,只不過二者做用不一樣。sendQueue
是用來存放待發送的完整的信息數據,而sending
更底層一些,它持有的數據可能會被分紅多個塊發送。若是隻有sendQueue
一個通道,那麼很難實現分塊的操做的。
而Channel
的發送是由MConnection
來調用的,幸運的是,當咱們一直往回追溯下去,發現竟走到了MConnection.OnStart
這裏。也就是說,咱們在這個小問題中,研究的正好是前面兩個鏈條後面的部分:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ???
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ???
也就是上面的???
部分。
那麼咱們就直接從MConnection.OnStart
開始:
func (c *MConnection) OnStart() error { // ... go c.sendRoutine() // ... }
c.sendRoutine()
方法就是咱們須要的。當MConnection
啓動之後,就會開始進行發送操做(等待數據到來)。它的代碼以下:
func (c *MConnection) sendRoutine() { // ... case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: default: } } } // ... }
這個方法原本很長,只是咱們省略掉了不少無關的代碼。裏面的c.sendSomeMsgPackets()
就是咱們要找的,可是,咱們忽然發現,怎麼又出來了一個c.send
通道?它又有什麼用?並且看起來好像只有當這個通道里有東西的時候,咱們纔會去調用c.sendSomeMsgPackets()
,彷佛像是一個鈴鐺同樣用來提醒咱們。
那麼c.send
何時會有東西呢?檢查了代碼以後,發如今如下3個地方:
func (c *MConnection) Send(chID byte, msg interface{}) bool { // ... success := channel.sendBytes(wire.BinaryBytes(msg)) if success { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: // .. }
func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // ... ok = channel.trySendBytes(wire.BinaryBytes(msg)) if ok { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: // ... }
func (c *MConnection) sendRoutine() { // .... case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: // ... }
若是咱們對前一篇文章還有印象,就會記得channel.trySendBytes
是在咱們想給對方節點發信息時調用的,調用完之後,它會把信息對應的二進制數據放入到channel.sendQueue
通道(因此纔有了本文)。channel.sendBytes
咱們目前雖然還沒用到,可是它也應該是相似的。在它們兩個調用完以後,它們都會向c.send
通道里放入一個數據,用來通知Channel
有數據能夠發送了。
而第三個sendRoutine()
就是咱們剛剛走到的地方。當咱們調用c.sendSomeMsgPackets()
發送了sending
中的一部分以後,若是還有剩餘的,則繼續向c.send
放個數據,提醒能夠繼續發送。
那到目前爲止,發送數據涉及到的Channel就有三個了,分別是sendQueue
、sending
和send
。之因此這麼複雜,根本緣由就是想把數據分塊發送。
爲何要分塊發送呢?這是由於比原但願能控制發送速率,讓節點之間的網速能保持在一個合理的水平。若是不限制的話,一會兒發出大量的數據,一是可能會讓接收者來不及處理,二是有可能會被惡意節點利用,請求大量區塊數據把帶寬佔滿。
擔憂sendQueue
、sending
和send
這三個通道不太好理解,我想到了一個「燒鴨店」的比喻,來理解它們:
sendQueue
就像是用來掛烤好的燒鴨的勾子,能夠有多個(但對於比原來講,默認只有一個,由於sendQueue
的容量默認爲1
),當有燒鴨烤好之後,就掛在勾子上;sending
是砧板,能夠把燒鴨從sendQueue
勾子上取下來一隻,放在上面切成塊,等待裝盤,一隻燒鴨可能能夠裝成好幾盤;send
是鈴鐺,當有人點單後,服務員就會按一下鈴鐺,廚師就從sending
砧板上拿幾塊燒鴨放在小盤中放在出餐口。因爲廚師很是忙,每次切出一盤後均可能會去作別的事情,而忘了sending
砧板上還有燒鴨沒裝盤,因此爲了防止本身忘記,他每切出一盤以後,都會看一眼sending
砧板,若是還有肉,就會按一下鈴鐺提醒本身繼續裝盤。好了,理解了send
後,咱們就能夠回到主線,繼續看c.sendSomeMsgPackets()
的代碼了:
func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { if c.sendMsgPacket() { return true } } return false }
c.sendMonitor.Limit
的做用是限制發送速率,其中maxMsgPacketTotalSize
即每一個packet的最大長度爲常量10240
,第二個參數是預先指定的發送速率,默認值爲500KB/s
,第三個參數是說,當實際速度過大時,是否暫停發送,直到變得正常。
通過限速的調整後,後面一段就能夠正常發送數據了,其中的c.sendMsgPacket
是咱們繼續要看的方法:
func (c *MConnection) sendMsgPacket() bool { // ... n, err := leastChannel.writeMsgPacketTo(c.bufWriter) // .. c.sendMonitor.Update(int(n)) // ... return false }
這個方法最前面我省略了一大段代碼,其做用是檢查多個channel,結合它們的優先級和已經發的數據量,找到當前最須要發送數據的那個channel,記爲leastChannel
。
而後就是調用leastChannel.writeMsgPacketTo(c.bufWriter)
,把當前要發送的一塊數據,寫到bufWriter
中。這個bufWriter
就是真正與鏈接對象綁定的一個緩存區,寫入到它裏面的數據,會被Go發送出去。它的定義是在建立MConnection
的地方:
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
其中minReadBufferSize
爲1024
,minWriteBufferSize
爲65536
。
數據寫到bufWriter
之後,咱們就不須要關心了,交給Go來操做了。
在leastChannel.writeMsgPacketTo(c.bufWriter)
調用完之後,後面會更新c.sendMonitor
,這樣它才能繼續正確的限速。
這時咱們已經知道數據是怎麼發出去的了,可是咱們尚未找到是誰在監視sending
裏的數據,那讓咱們繼續看leastChannel.writeMsgPacketTo
:
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteBinary(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) } return }
其中的ch.nextMsgPacket()
是取出下一個要發送的數據塊,那麼是從哪裏取出呢?是從sending
嗎?
其後的代碼是把數據塊對象變成二進制,放入到前面的bufWriter
中發送。
繼續ch.nextMsgPacket()
:
func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet }
終於看到sending
了。從這裏能夠看出,sending
的確是放着不少塊鴨肉的砧板,而packet
就是一個小盤,因此須要從先sending
中拿出不超過指定長度的數據放到packet
中,而後判斷sending
裏還有沒有剩下的。若是有,則packet
的EOF
值爲0x00
,不然爲0x01
,這樣調用者就知道數據有沒有發完,還需不須要去按那個叫send
的鈴。
那麼到這裏爲止,咱們就知道原來仍是Channel本身在關注sending
,而且爲了限制發送速度,須要把它切成一個個小塊。
最後就咱們的第三個小問題了,其實咱們剛纔在第二問裏已經弄清楚了。
sending
中的數據被取走後,又是如何被髮送到其它節點的呢?答案就是,sending
中的數據被分紅一塊塊取出來後,會放入到bufWriter
中,就直接被Go的net.Conn
對象發送出去了。到這一層面,就不須要咱們再繼續深刻了。
因爲本篇中涉及的方法調用比較多,可能看完都亂了,因此在最後,咱們前面調用鏈補充完整,放在最後:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ...
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ...
而後是:
MConnection.sendRoutine
-> MConnection.send
-> MConnection.sendSomeMsgPackets
-> MConnection.sendMsgPacket
-> MConnection.writeMsgPacketTo
-> MConnection.nextMsgPacket
-> MConnection.sending
到了最後,個人感受就是,一個複雜問題最開始看起來很可怕,可是一旦把它分解成小問題以後,每次只關注一個,各個擊破,好像就沒那麼複雜了。