做者:freewindnode
比原項目倉庫:react
Github地址:https://github.com/Bytom/bytomgit
Gitee地址:https://gitee.com/BytomBlockchain/bytomgithub
在前一篇中,咱們已經知道如何連上一個比原節點的p2p端口,並與對方完成身份驗證。此時,雙方結點已經創建起來了信任,而且鏈接也不會斷開,下一步,二者就能夠繼續交換數據了。數組
那麼,我首先想到的就是,如何才能讓對方把它已有的區塊數據全都發給我呢?函數
這其實能夠分爲三個問題:區塊鏈
因爲這一塊的邏輯仍是比較複雜的,因此在本篇咱們先回答第一個問題:fetch
首先咱們先要在代碼中定位到,比原究竟是在何時來向對方節點發送請求的。ui
在前一篇講的是如何創建鏈接並驗證身份,那麼發出數據請求的操做,必定在上次的代碼以後。按照這個思路,咱們在SyncManager
類中Switch
啓動以後,找到了一個叫BlockKeeper
的類,相關的操做是在它裏面完成的。atom
下面是老規矩,仍是從啓動開始,可是會更簡化一些:
func main() { cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir())) cmd.Execute() }
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error { n := node.NewNode(config) if _, err := n.Start(); err != nil { // ... }
func (n *Node) OnStart() error { // ... n.syncManager.Start() // ... }
func (sm *SyncManager) Start() { go sm.netStart() // ... go sm.syncer() }
注意sm.netStart()
,咱們在一篇中創建鏈接並驗證身份的操做,就是在它裏面完成的。而此次的這個問題,是在下面的sm.syncer()
中完成的。
另外注意,因爲這兩個函數調用都使用了goroutine,因此它們是同時進行的。
sm.syncer()
的代碼以下:
func (sm *SyncManager) syncer() { sm.fetcher.Start() defer sm.fetcher.Stop() // ... for { select { case <-sm.newPeerCh: log.Info("New peer connected.") // Make sure we have peers to select from, then sync if sm.sw.Peers().Size() < minDesiredPeerCount { break } go sm.synchronise() // .. } }
這裏混入了一個叫fetcher
的奇怪的東西,名字看起來好像是專門去抓取數據的,咱們要找的是它嗎?
惋惜不是,fetcher
的做用是從多個peer那裏拿到了區塊數據以後,對數據進行整理,把有用的放到本地鏈上。咱們在之後會研究它,因此這裏不展開討論。
接着是一個for
循環,當發現通道newPeerCh
有了新數據(也就是有了新的節點鏈接上了),會判斷一下當前本身連着的節點是否夠多(大於等於minDesiredPeerCount
,值爲5
),夠多的話,就會進入sm.synchronise()
,進行數據同步。
這裏爲何要多等幾個節點,而不是一連上就立刻同步呢?我想這是但願有更多選擇的機會,找到一個數據夠多的節點。
sm.synchronise()
仍是屬於SyncManager
的方法。在真正調用到BlockKeeper
的方法以前,它還作了一些好比清理已經斷開的peer,找到最適合同步數據的peer等。其中「清理peer」的工做涉及到不一樣的對象持有的peer集合間的同步,略有些麻煩,但對當前問題幫助不大,因此我打算把它們放在之後的某個問題中回答(好比「當一個節點斷開了,比原會有什麼樣的處理」),這裏就先省略。
sm.synchronise()
代碼以下:
func (sm *SyncManager) synchronise() { log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List()) // ... peer, bestHeight := sm.peers.BestPeer() // ... if bestHeight > sm.chain.BestBlockHeight() { // ... sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight) } }
能夠看到,首先是從衆多的peers中,找到最合適的那個。什麼叫Best呢?看一下BestPeer()
的定義:
func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) { // ... for _, p := range ps.peers { if bestPeer == nil || p.height > bestHeight { bestPeer, bestHeight = p.swPeer, p.height } } return bestPeer, bestHeight }
其實就是持有區塊鏈數據最長的那個。
找到了BestPeer以後,就調用sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
方法,從這裏,正式進入BlockKeeper
-- 也就是本文的主角 -- 的世界。
blockKeeper.BlockRequestWorker
的邏輯比較複雜,它包含了:
因爲本文中只關注「發送請求」,因此一些與之關係不大的邏輯我會忽略掉,留待之後再講。
在「發送請求」這裏,實際也包含了兩種情形,一種簡單的,一種複雜的:
因爲第2種狀況對於本文來講過於複雜(由於須要深入理解比原鏈中分叉的處理邏輯),因此在本文中將把問題簡化,只考慮第1種。而分叉的處理,將放在之後講解。
下面是把blockKeeper.BlockRequestWorker
中的代碼簡化成了只包含第1種狀況:
func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error { num := bk.chain.BestBlockHeight() + 1 reqNum := uint64(0) reqNum = num // ... bkPeer, ok := bk.peers.Peer(peerID) swPeer := bkPeer.getPeer() // ... block, err := bk.BlockRequest(peerID, reqNum) // ... }
在這種狀況下,咱們能夠認爲bk.chain.BestBlockHeight()
中的Best
,指的是本地持有的不帶分叉的區塊鏈高度最高的那個。(須要提醒的是,若是存在分叉狀況,則Best
不必定是高度最高的那個)
那麼咱們就能夠直接向最佳peer請求下一個高度的區塊,它是經過bk.BlockRequest(peerID, reqNum)
實現的:
func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) { var block *types.Block if err := bk.blockRequest(peerID, height); err != nil { return nil, errReqBlock } // ... for { select { case pendingResponse := <-bk.pendingProcessCh: block = pendingResponse.block // ... return block, nil // ... } } }
在上面簡化後的代碼中,主要分紅了兩個部分。一個是發送請求bk.blockRequest(peerID, height)
,這是本文的重點;它下面的for-select
部分,已是在等待並處理對方節點的返回數據了,這部分咱們今天先略過不講。
bk.blockRequest(peerID, height)
這個方法,從邏輯上又能夠分紅兩部分:
bk.blockRequest(peerID, height)
通過一連串的方法調用以後,使用height
構造出了一個BlockRequestMessage
對象,代碼以下:
func (bk *blockKeeper) blockRequest(peerID string, height uint64) error { return bk.peers.requestBlockByHeight(peerID, height) }
func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error { peer, ok := ps.Peer(peerID) // ... return peer.requestBlockByHeight(height) }
func (p *peer) requestBlockByHeight(height uint64) error { msg := &BlockRequestMessage{Height: height} p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}) return nil }
到這裏,終於構造出了所須要的BlockRequestMessage
,其實主要就是把height
告訴peer。
而後,經過Peer
的TrySend()
把該信息發出去。
在TrySend
中,主要是經過github.com/tendermint/go-wire
庫將其序列化,再發送給對方。看起來應該是很簡單的操做吧,先預個警,仍是挺繞的。
當咱們進入TrySend()
後:
func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } return p.mconn.TrySend(chID, msg) }
發現它把鍋丟給了p.mconn.TrySend
方法,那麼mconn
是什麼?chID
又是什麼?
mconn
是MConnection
的實例,它是從哪兒來的?它應該在以前的某個地方初始化了,不然咱們無法直接調用它。因此咱們先來找到它初始化的地方。
通過一番尋找,發現原來是在前一篇以後,即比原節點與另外一個節點完成了身份驗證以後,具體的位置在Switch
類啓動的地方。
咱們此次直接從Swtich
的OnStart
做爲起點:
func (sw *Switch) OnStart() error { //... // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } return nil }
func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() // ... err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) // ... } }
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { // ... peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) // ... }
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) }
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn := rawConn // ... if config.AuthEnc { // ... conn, err = MakeSecretConnection(conn, ourNodePrivKey) // ... } // Key and NodeInfo are set after Handshake p := &Peer{ outbound: outbound, conn: conn, config: config, Data: cmn.NewCMap(), } p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig) p.BaseService = *cmn.NewBaseService(nil, "Peer", p) return p, nil }
終於找到了。上面方法中的MakeSecretConnection
就是與對方節點交換公鑰並進行身份驗證的地方,下面的p.mconn = createMConnection(...)
就是建立mconn
的地方。
繼續進去:
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { if chID == PexChannel { return } else { cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) } } reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { onPeerError(p, r) } return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) }
原來mconn
是MConnection
的實例,它是經過NewMConnectionWithConfig
建立的。
看了上面的代碼,發現這個MConnectionWithConfig
與普通的net.Conn
並無太大的區別,只不過是當收到了對方發來的數據後,會根據指定的chID
調用相應的Reactor
的Receive
方法來處理。因此它起到了將數據分發給Reactor
的做用。
爲何須要這樣的分發操做呢?這是由於,在比原中,節點之間交換數據,有多種不一樣的方式:
ProtocolReactor
中實現,它對應的chID
是BlockchainChannel
,值爲byte(0x40)
PEXReactor
中實現,它對應的chID
是PexChannel
,值爲byte(0x00)
因此節點之間發送信息的時候,須要知道對方發過來的數據對應的是哪種方式,而後轉交給相應的Reactor
去處理。
在比原中,前者是主要的方式,後者起到輔助做用。咱們目前的文章中涉及到的都是前者,後者將在之後專門研究。
p.mconn.TrySend
當咱們知道了p.mconn.TrySend
中的mconn
是什麼,而且在何時初始化之後,下面就能夠進入它的TrySend
方法了。
func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // ... channel, ok := c.channelsIdx[chID] // ... ok = channel.trySendBytes(wire.BinaryBytes(msg)) if ok { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: default: } } return ok }
能夠看到,它找到相應的channel後(在這裏應該是ProtocolReactor
對應的channel),調用channel的trySendBytes
方法。在發送數據的時候,使用了github.com/tendermint/go-wire
庫,將msg
序列化爲二進制數組。
func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true default: return false } }
原來它是把要發送的數據,放到了該channel對應的sendQueue
中,交由別人來發送。具體是由誰來發送,咱們立刻要就找到它。
細心的同窗會發現,Channel
除了trySendBytes
方法外,還有一個sendBytes
(在本文中沒有用上):
func (ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true case <-time.After(defaultSendTimeout): return false } }
它們兩個的區別是,前者嘗試把待發送數據bytes
放入ch.sendQueue
時,若是能放進去,則返回true
,不然立刻失敗,返回false
,因此它是非阻塞的。然後者,若是放不進去(sendQueue
已滿,那邊還沒處理完),則等待defaultSendTimeout
(值爲10
秒),而後纔會失敗。另外,sendQueue
的容量默認爲1
。
到這裏,咱們其實已經知道比原是如何向其它節點請求區塊數據,以及什麼時候把信息發送出去。
本想在本篇中就把真正發送數據的代碼也一塊兒講了,可是發現它的邏輯也至關複雜,因此就另開一篇講吧。
再回到本文問題,再強調一下,咱們前面說了,對於向peer請求區塊數據,有兩種狀況:一種是簡單的不考慮分叉的,另外一種是複雜的考慮分叉的。在本文只考慮了簡單的狀況,在這種狀況下,所謂的bestHeight
就是指的最高的那個區塊的高度,而在複雜狀況下,它就不必定了。這就留待之後咱們再詳細討論,本文的問題就算是回答完畢了。