這篇文章從區塊傳播策略入手,介紹新區塊是如何傳播到遠端節點,以及新區塊加入到遠端節點本地鏈的過程,同時會介紹fetcher模塊,fetcher的功能是處理Peer通知的區塊信息。在介紹過程當中,還會涉及到p2p,eth等模塊,不會專門介紹,而是專一區塊的傳播和加入區塊鏈的過程。node
當前代碼是以太坊Release 1.8,若是版本不一樣,代碼上可能存在差別。數據庫
本節從宏觀角度介紹,節點產生區塊後,爲了傳播給遠端節點作了啥,遠端節點收到區塊後又作了什麼,每一個節點都鏈接了不少Peer,它傳播的策略是什麼樣的?promise
整體流程和策略能夠總結爲,傳播給遠端Peer節點,Peer驗證區塊無誤後,加入到本地區塊鏈,繼續傳播新區塊信息。具體過程以下。網絡
先看整體過程。產生區塊後,miner
模塊會發佈一個事件NewMinedBlockEvent
,訂閱事件的協程收到事件後,就會把新區塊的消息,廣播給它全部的peer,peer收到消息後,會交給本身的fetcher模塊處理,fetcher進行基本的驗證後,區塊沒問題,發現這個區塊就是本地鏈須要的下一個區塊,則交給blockChain
進一步進行完整的驗證,這個過程會執行區塊全部的交易,無誤後把區塊加入到本地鏈,寫入數據庫,這個過程就是下面的流程圖,圖1。app
整體流程圖,能看到有個分叉,是由於節點傳播新區塊是有策略的。它的傳播策略爲:dom
N
個Peer,它只向Peer列表的sqrt(N)
個Peer廣播完整的區塊消息。策略圖的效果如圖2,紅色節點將區塊傳播給黃色節點:async
收到區塊Hash的節點,須要從發送給它消息的Peer那裏獲取對應的完整區塊,獲取區塊後就會按照圖1的流程,加入到fetcher隊列,最終插入本地區塊鏈後,將區塊的Hash值廣播給和它相連,但還不知道這個區塊的Peer。非產生區塊節點的策略圖,如圖3,黃色節點將區塊Hash傳播給青色節點:ide
至此,能夠看出以太坊採用以石擊水的方式,像水紋同樣,層層擴散新產生的區塊。函數
fetcher模塊的功能,就是收集其餘Peer通知它的區塊信息:1)完整的區塊2)區塊Hash消息。根據通知的消息,獲取完整的區塊,而後傳遞給eth
模塊把區塊插入區塊鏈。oop
若是是完整區塊,就能夠傳遞給eth插入區塊,若是隻有區塊Hash,則須要從其餘的Peer獲取此完整的區塊,而後再傳遞給eth插入區塊。
本節介紹區塊傳播和處理的細節東西,方式仍然是先用圖解釋流程,再是代碼流程。
節點產生區塊後,廣播的流程能夠表示爲圖4:
再看下代碼上的細節。
worker.wait()
函數發佈事件NewMinedBlockEvent
。ProtocolManager.minedBroadcastLoop()
是事件處理函數。它調用了2次pm.BroadcastBlock()
。// Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range pm.minedBlockSub.Chan() { switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: pm.BroadcastBlock(ev.Block, true) // First propagate block to peers pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } }
pm.BroadcastBlock()
的入參propagate
爲真時,向部分Peer廣播完整的區塊,調用peer.AsyncSendNewBlock()
,不然向全部Peer廣播區塊頭,調用peer.AsyncSendNewBlockHash()
,這2個函數就是把數據放入隊列,此處再也不放代碼。// BroadcastBlock will either propagate a block to a subset of it's peers, or // will only announce it's availability (depending what's requested). func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash) // If propagation is requested, send to a subset of the peer // 這種狀況,要把區塊廣播給部分peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) // 計算新的總難度 var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers // 廣播區塊給部分peer transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it // 把區塊hash值廣播給全部peer if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.AsyncSendNewBlockHash(block) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } }
peer.broadcase()
是每一個Peer鏈接的廣播函數,它只廣播3種消息:交易、完整的區塊、區塊的Hash,這樣代表了節點只會主動廣播這3中類型的數據,剩餘的數據同步,都是經過請求-響應的方式。
// broadcast is a write loop that multiplexes block propagations, announcements // and transaction broadcasts into the remote peer. The goal is to have an async // writer that does not lock up node internals. func (p *peer) broadcast() { for { select { // 廣播交易 case txs := <-p.queuedTxs: if err := p.SendTransactions(txs); err != nil { return } p.Log().Trace("Broadcast transactions", "count", len(txs)) // 廣播完整的新區塊 case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) // 廣播區塊Hash case block := <-p.queuedAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) case <-p.term: return } } }
本節介紹遠端節點收到2種區塊同步消息的處理,其中NewBlockMsg
的處理流程比較清晰,也簡潔。NewBlockHashesMsg
消息的處理就繞了2繞,從整體流程圖1上能看出來,它須要先從給他發送消息Peer那裏獲取到完整的區塊,剩下的流程和NewBlockMsg
又一致了。
這部分涉及的模塊多,畫出來有種眼花繚亂的感受,但只要抓住上面的主線,代碼看起來仍是很清晰的。經過圖5先看下總體流程。
消息處理的起點是ProtocolManager.handleMsg
,NewBlockMsg
的處理流程是藍色標記的區域,紅色區域是單獨的協程,是fetcher處理隊列中區塊的流程,若是從隊列中取出的區塊是當前鏈須要的,校驗後,調用blockchian.InsertChain()
把區塊插入到區塊鏈,最後寫入數據庫,這是黃色部分。最後,綠色部分是NewBlockHashesMsg
的處理流程,代碼流程上是比較複雜的,爲了能經過圖描述總體流程,我把它簡化掉了。
仔細看看這幅圖,掌握總體的流程後,接下來看每一個步驟的細節。
本節介紹節點收到完整區塊的處理,流程以下:
調用fetcher.Enqueue
。只看handle.Msg()
的NewBlockMsg
相關的部分。
case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block // 收到新區塊,解碼,賦值接收數據 var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } request.Block.ReceivedAt = msg.ReceivedAt request.Block.ReceivedFrom = p // Mark the peer as owning the block and schedule it for import // 標記peer知道這個區塊 p.MarkBlock(request.Block.Hash()) // 爲啥要如隊列?已經獲得完整的區塊了 // 答:存入fetcher的優先級隊列,fetcher會從隊列中選取當前高度須要的塊 pm.fetcher.Enqueue(p.id, request.Block) // Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. // 截止到parent區塊的頭和難度 var ( trueHead = request.Block.ParentHash() trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) ) // Update the peers total difficulty if better than the previous // 若是收到的塊的難度大於peer以前的,以及本身本地的,就去和這個peer同步 // 問題:就只用了一下塊裏的hash指,爲啥不直接使用這個塊呢,若是這個塊不能用,幹嗎很多發送些數據,減小網絡負載呢。 // 答案:實際上,這個塊加入到了優先級隊列中,當fetcher的loop檢查到當前下一個區塊的高度,正是隊列中有的,則再也不向peer請求 // 該區塊,而是直接使用該區塊,檢查無誤後交給block chain執行insertChain if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD) // Schedule a sync if above ours. Note, this will not fire a sync for a gap of // a singe block (as the true TD is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } } //------------------------ 以上 handleMsg // Enqueue tries to fill gaps the the fetcher's future import queue. // 發給inject通道,當前協程在handleMsg,經過通道發送給fetcher的協程處理 func (f *Fetcher) Enqueue(peer string, block *types.Block) error { op := &inject{ origin: peer, block: block, } select { case f.inject <- op: return nil case <-f.quit: return errTerminated } } //------------------------ 如下 fetcher.loop處理inject部分 case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps // 區塊加入隊列,首先也填入未決的間距 propBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block) //------------------------ 如隊列函數 // enqueue schedules a new future import operation, if the block to be imported // has not yet been seen. // 把導入的新區塊放進來 func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash() // Ensure the peer isn't DOSing us // 防止peer的DOS攻擊 count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks // 高度檢查:將來太遠的塊丟棄 if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing // 塊先加入優先級隊列,加入鏈以前,還有不少要作 if _, ok := f.queued[hash]; !ok { op := &inject{ origin: peer, block: block, } f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -float32(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) } }
本節咱們看看,區塊加入隊列後,fetcher如何處理區塊,爲什麼不直接校驗區塊,插入到本地鏈?
因爲以太坊又Uncle的機制,節點可能收到老一點的一些區塊。另外,節點可能因爲網絡緣由,落後了幾個區塊,因此可能收到「將來」的一些區塊,這些區塊都不能直接插入到本地鏈。
區塊入的隊列是一個優先級隊列,高度低的區塊會被優先取出來。fetcher.loop
是單獨協程,不斷運轉,清理fecther中的事務和事件。首先會清理正在fetching
的區塊,但已經超時。而後處理優先級隊列中的區塊,判斷高度是不是下一個區塊,若是是則調用f.insert()
函數,校驗後調用BlockChain.InsertChain()
,成功插入後,廣播新區塊的Hash。
// Loop is the main fetcher loop, checking and processing various notification // events. func (f *Fetcher) loop() { // Iterate the block fetching until a quit is requested fetchTimer := time.NewTimer(0) completeTimer := time.NewTimer(0) for { // Clean up any expired block fetches // 清理過時的區塊 for hash, announce := range f.fetching { if time.Since(announce.time) > fetchTimeout { f.forgetHash(hash) } } // Import any queued blocks that could potentially fit // 導入隊列中合適的塊 height := f.chainHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*inject) hash := op.block.Hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) } // If too high up the chain or phase, continue later // 塊不是鏈須要的下一個塊,再入優先級隊列,中止循環 number := op.block.NumberU64() if number > height+1 { f.queue.Push(op, -float32(number)) if f.queueChangeHook != nil { f.queueChangeHook(hash, true) } break } // Otherwise if fresh and still unknown, try and import // 高度正好是咱們想要的,而且鏈上也沒有這個塊 if number+maxUncleDist < height || f.getBlock(hash) != nil { f.forgetBlock(hash) continue } // 那麼,塊插入鏈 f.insert(op.origin, op.block) } //省略 } }
func (f *Fetcher) insert(peer string, block *types.Block) { hash := block.Hash() // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { defer func() { f.done <- hash }() // If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes // 驗證區塊頭,成功後廣播區塊 switch err := f.verifyHeader(block.Header()); err { case nil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) case consensus.ErrFutureBlock: // Weird future block, don't fail, but neither propagate default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues // 調用回調函數,實際是blockChain.insertChain if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false) // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(block) } }() }
本節介紹NewBlockHashesMsg的處理,其實,消息處理是簡單的,而複雜一點的是從Peer哪獲取完整的區塊,下節再看。
流程以下:
fetcher.Notify
記錄好通知信息,塞入notify
通道,以便交給fetcher的協程。fetcher.loop()
會對notify
中的消息進行處理,確認區塊並不是DOS攻擊,而後檢查區塊的高度,判斷該區塊是否已經在fetching
或者comleting(表明已經下載區塊頭,在下載body)
,若是都沒有,則加入到announced
中,觸發0s定時器,進行處理。關於announced
下節再介紹。
// handleMsg()部分 case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval // 把本地鏈沒有的塊hash找出來,交給fetcher去下載 unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) }
// Notify announces the fetcher of the potential availability of a new block in // the network. // 通知fetcher(本身)有新塊產生,沒有塊實體,有hash、高度等信息 func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { block := &announce{ hash: hash, number: number, time: time, origin: peer, fetchHeader: headerFetcher, fetchBodies: bodyFetcher, } select { case f.notify <- block: return nil case <-f.quit: return errTerminated } }
// fetcher.loop()的notify通道消息處理 case notification := <-f.notify: // A block was announced, make sure the peer isn't DOSing us propAnnounceInMeter.Mark(1) count := f.announces[notification.origin] + 1 if count > hashLimit { log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) propAnnounceDOSMeter.Mark(1) break } // If we have a valid block number, check that it's potentially useful // 高度檢查 if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) propAnnounceDropMeter.Mark(1) break } } // All is well, schedule the announce if block's not yet downloading // 檢查是否已經在下載,已下載則忽略 if _, ok := f.fetching[notification.hash]; ok { break } if _, ok := f.completing[notification.hash]; ok { break } // 更新peer已經通知給咱們的區塊數量 f.announces[notification.origin] = count // 把通知信息加入到announced,供調度 f.announced[notification.hash] = append(f.announced[notification.hash], notification) if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 { f.announceChangeHook(notification.hash, true) } if len(f.announced) == 1 { // 有通知放入到announced,則重設0s定時器,loop的另一個分支會處理這些通知 f.rescheduleFetch(fetchTimer) }
本節介紹fetcher獲取完整區塊的過程,這也是fetcher最重要的功能,會涉及到fetcher至少80%的代碼。單獨拉放一大節吧。
Fetcher最主要的功能就是獲取完整的區塊,而後在合適的實際交給InsertChain去驗證和插入到本地區塊鏈。咱們仍是從宏觀入手,看Fetcher是如何工做的,必定要先掌握好宏觀,由於代碼層面上沒有這麼清晰。
首先,看兩個節點是如何交互,獲取完整區塊,使用時序圖的方式看一下,見圖6,流程很清晰再也不文字介紹。
再看下獲取區塊過程當中,fetcher內部的狀態轉移,它使用狀態來記錄,要獲取的區塊在什麼階段,見圖7。我稍微解釋一下:
NewBlockHashesMsg
後,相關信息會記錄到announced
,進入announced
狀態,表明了本節點接收了消息。announced
由fetcher協程處理,通過校驗後,會向給他發送消息的Peer發送請求,請求該區塊的區塊頭,而後進入fetching
狀態。completing
狀態,而且使用區塊頭合成完整的區塊,加入到queued
優先級隊列。fetched
狀態,而後發送請求,請求交易和uncle,而後轉移到completing
狀態。queued
。
接下來就是從代碼角度看如何獲取完整區塊的流程了,有點多,看不懂的時候,再回顧下上面宏觀的介紹圖。
首先看Fetcher的定義,它存放了通訊數據和狀態管理,撿加註釋的看,上文提到的狀態,裏面都有。
// Fetcher is responsible for accumulating block announcements from various peers // and scheduling them for retrieval. // 積累塊通知,而後調度獲取這些塊 type Fetcher struct { // Various event channels // 收到區塊hash值的通道 notify chan *announce // 收到完整區塊的通道 inject chan *inject blockFilter chan chan []*types.Block // 過濾header的通道的通道 headerFilter chan chan *headerFilterTask // 過濾body的通道的通道 bodyFilter chan chan *bodyFilterTask done chan common.Hash quit chan struct{} // Announce states // Peer已經給了本節點多少區塊頭通知 announces map[string]int // Per peer announce counts to prevent memory exhaustion // 已經announced的區塊列表 announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching // 正在fetching區塊頭的請求 fetching map[common.Hash]*announce // Announced blocks, currently fetching // 已經fetch到區塊頭,還差body的請求,用來獲取body fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval // 已經獲得區塊頭的 completing map[common.Hash]*announce // Blocks with headers, currently body-completing // Block cache // queue,優先級隊列,高度作優先級 // queues,統計peer通告了多少塊 // queued,表明這個塊如隊列了, queue *prque.Prque // Queue containing the import operations (block number sorted) queues map[string]int // Per peer block counts to prevent memory exhaustion queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports) // Callbacks getBlock blockRetrievalFn // Retrieves a block from the local chain verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work,驗證區塊頭,包含了PoW驗證 broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,廣播給peer chainHeight chainHeightFn // Retrieves the current chain's height insertChain chainInsertFn // Injects a batch of blocks into the chain,插入區塊到鏈的函數 dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) }
NewBlockHashesMsg
消息的處理前面的小節已經講過了,不記得可向前翻看。這裏從announced
的狀態處理提及。loop()
中,fetchTimer
超時後,表明了收到了消息通知,須要處理,會從announced
中選擇出須要處理的通知,而後建立請求,請求區塊頭,因爲可能有不少節點都通知了它某個區塊的Hash,因此隨機的從這些發送消息的Peer中選擇一個Peer,發送請求的時候,爲每一個Peer都建立了單獨的協程。
case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval // 有區塊通知,去處理 request := make(map[string][]common.Hash) for hash, announces := range f.announced { if time.Since(announces[0].time) > arriveTimeout-gatherSlack { // Pick a random peer to retrieve from, reset all others // 可能有不少peer都發送了這個區塊的hash值,隨機選擇一個peer announce := announces[rand.Intn(len(announces))] f.forgetHash(hash) // If the block still didn't arrive, queue for fetching // 本地尚未這個區塊,建立獲取區塊的請求 if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.fetching[hash] = announce } } } // Send out all block header requests // 把全部的request發送出去 // 爲每個peer都建立一個協程,而後請求全部須要從該peer獲取的請求 for peer, hashes := range request { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) // Create a closure of the fetch and schedule in on a new thread fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes go func() { if f.fetchingHook != nil { f.fetchingHook(hashes) } for _, hash := range hashes { headerFetchMeter.Mark(1) fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals } }() } // Schedule the next fetch if blocks are still pending f.rescheduleFetch(fetchTimer)
從Notify
的調用中,能夠看出,fetcherHeader()
的實際函數是RequestOneHeader()
,該函數使用的消息是GetBlockHeadersMsg
,能夠用來請求多個區塊頭,不過fetcher只請求一個。
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *peer) RequestOneHeader(hash common.Hash) error { p.Log().Debug("Fetching single header", "hash", hash) return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) }
GetBlockHeadersMsg
的處理以下:由於它是獲取多個區塊頭的,因此處理起來比較「麻煩」,還好,fetcher只獲取一個區塊頭,其處理在20行~33行,獲取下一個區塊頭的處理邏輯,這裏就不看了,最後調用SendBlockHeaders()
將區塊頭髮送給請求的節點,消息是BlockHeadersMsg
。
// handleMsg() // Block header query, collect the requested headers and reply case msg.Code == GetBlockHeadersMsg: // Decode the complex header query var query getBlockHeadersData if err := msg.Decode(&query); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } hashMode := query.Origin.Hash != (common.Hash{}) // Gather headers until the fetch or network limits is reached // 收集區塊頭,直到達到限制 var ( bytes common.StorageSize headers []*types.Header unknown bool ) // 本身已知區塊 && 少於查詢的數量 && 大小小於2MB && 小於能下載的最大數量 for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { // Retrieve the next header satisfying the query // 獲取區塊頭 var origin *types.Header if hashMode { // fetcher 使用的模式 origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) } else { origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } if origin == nil { break } number := origin.Number.Uint64() headers = append(headers, origin) bytes += estHeaderRlpSize // Advance to the next header of the query // 下一個區塊頭的獲取,不一樣策略,方式不一樣 switch { case query.Origin.Hash != (common.Hash{}) && query.Reverse: // ... } } return p.SendBlockHeaders(headers)
BlockHeadersMsg
的處理頗有意思,由於GetBlockHeadersMsg
並非fetcher獨佔的消息,downloader也能夠調用,因此,響應消息的處理須要分辨出是fetcher請求的,仍是downloader請求的。它的處理邏輯是:fetcher先過濾收到的區塊頭,若是fetcher不要的,那就是downloader的,在調用fetcher.FilterHeaders
的時候,fetcher就將本身要的區塊頭拿走了。
// handleMsg() case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests var headers []*types.Header if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // If no headers were received, but we're expending a DAO fork check, maybe it's that // 檢查是否是當前DAO的硬分叉 if len(headers) == 0 && p.forkDrop != nil { // Possibly an empty reply to the fork header checks, sanity check TDs verifyDAO := true // If we already have a DAO header, we can check the peer's TD against it. If // the peer's ahead of this, it too must have a reply to the DAO check if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { verifyDAO = false } } // If we're seemingly on the same chain, disable the drop timer if verifyDAO { p.Log().Debug("Seems to be on the same side of the DAO fork") p.forkDrop.Stop() p.forkDrop = nil return nil } } // Filter out any explicitly requested headers, deliver the rest to the downloader // 過濾是否是fetcher請求的區塊頭,去掉fetcher請求的區塊頭再交給downloader filter := len(headers) == 1 if filter { // If it's a potential DAO fork check, validate against the rules // 檢查是否硬分叉 if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { // Disable the fork drop timer p.forkDrop.Stop() p.forkDrop = nil // Validate the header and either drop the peer or continue if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") return err } p.Log().Debug("Verified to be on the same side of the DAO fork") return nil } // Irrelevant of the fork checks, send the header to the fetcher just in case // 使用fetcher過濾區塊頭 headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } // 剩下的區塊頭交給downloader if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) } }
FilterHeaders()
是一個頗有大智慧的函數,看起來回味無窮,但實在妙。它要把全部的區塊頭,都傳遞給fetcher協程,還要獲取fetcher協程處理後的結果。fetcher.headerFilter
是存放通道的通道,而filter
是存放包含區塊頭過濾任務的通道。它先把filter
傳遞給了headerFilter
,這樣fetcher
協程就在另一段等待了,然後將headerFilterTask
傳入filter
,fetcher就能讀到數據了,處理後,再將數據寫回filter
而恰好被FilterHeaders
函數處理了,該函數實際運行在handleMsg()
的協程中。
每一個Peer都會分配一個ProtocolManager而後處理該Peer的消息,但fetcher
只有一個事件處理協程,若是不建立一個filter
,fetcher哪知道是誰發給它的區塊頭呢?過濾以後,該如何發回去呢?
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. // 尋找出fetcher請求的區塊頭 func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) // Send the filter channel to the fetcher // 任務通道 filter := make(chan *headerFilterTask) select { // 任務通道發送到這個通道 case f.headerFilter <- filter: case <-f.quit: return nil } // Request the filtering of the header list // 建立過濾任務,發送到任務通道 select { case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: case <-f.quit: return nil } // Retrieve the headers remaining after filtering // 從任務通道,獲取過濾的結果並返回 select { case task := <-filter: return task.headers case <-f.quit: return nil } }
接下來要看f.headerFilter
的處理,這段代碼有90行,它作了一下幾件事:
f.headerFilter
取出filter
,而後取出過濾任務task
。unknown
這不是分是要返回給調用者的,即handleMsg()
, incomplete
存放還須要獲取body的區塊頭,complete
存放只包含區塊頭的區塊。遍歷全部的區塊頭,填到到對應的分類中,具體的判斷可看18行的註釋,記住宏觀中將的狀態轉移圖。unknonw
中的區塊返回給handleMsg()
。incomplete
的區塊頭獲取狀態移動到fetched
狀態,而後觸發定時器,以便去處理complete
的區塊。compelete
的區塊加入到queued
。// fetcher.loop() case filter := <-f.headerFilter: // Headers arrived from a remote peer. Extract those that were explicitly // requested by the fetcher, and return everything else so it's delivered // to other parts of the system. // 收到從遠端節點發送的區塊頭,過濾出fetcher請求的 // 從任務通道獲取過濾任務 var task *headerFilterTask select { case task = <-filter: case <-f.quit: return } headerFilterInMeter.Mark(int64(len(task.headers))) // Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. // unknown的不是fetcher請求的,complete放沒有交易和uncle的區塊,有頭就夠了,incomplete放 // 還須要獲取uncle和交易的區塊 unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} // 遍歷全部收到的header for _, header := range task.headers { hash := header.Hash() // Filter fetcher-requested headers from other synchronisation algorithms // 是正在獲取的hash,而且對應請求的peer,而且未fetched,未completing,未queued if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer // 高度校驗,居然不匹配,擾亂秩序,peer確定是壞蛋。 if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) f.dropPeer(announce.origin) f.forgetHash(hash) continue } // Only keep if not imported by other means // 本地鏈沒有當前區塊 if f.getBlock(hash) == nil { announce.header = header announce.time = task.time // If the block is empty (header only), short circuit into the final import queue // 若是區塊沒有交易和uncle,加入到complete if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) block := types.NewBlockWithHeader(header) block.ReceivedAt = task.time complete = append(complete, block) f.completing[hash] = announce continue } // Otherwise add to the list of blocks needing completion // 不然就是不完整的區塊 incomplete = append(incomplete, announce) } else { log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) f.forgetHash(hash) } } else { // Fetcher doesn't know about it, add to the return list // 沒請求過的header unknown = append(unknown, header) } } // 把未知的區塊頭,再傳遞會filter headerFilterOutMeter.Mark(int64(len(unknown))) select { case filter <- &headerFilterTask{headers: unknown, time: task.time}: case <-f.quit: return } // Schedule the retrieved headers for body completion // 把未完整的區塊加入到fetched,跳過已經在completeing中的,而後觸發completeTimer定時器 for _, announce := range incomplete { hash := announce.header.Hash() if _, ok := f.completing[hash]; ok { continue } f.fetched[hash] = append(f.fetched[hash], announce) if len(f.fetched) == 1 { f.rescheduleComplete(completeTimer) } } // Schedule the header-only blocks for import // 把只有頭的區塊入隊列 for _, block := range complete { if announce := f.completing[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } }
跟隨狀態圖的轉義,剩下的工做是fetched
轉移到completing
,上面的流程已經觸發了completeTimer
定時器,超時後就會處理,流程與請求Header相似,再也不贅述,此時發送的請求消息是GetBlockBodiesMsg
,實際調的函數是RequestBodies
。
// fetcher.loop() case <-completeTimer.C: // At least one header's timer ran out, retrieve everything // 至少有1個header已經獲取完了 request := make(map[string][]common.Hash) // 遍歷全部待獲取body的announce for hash, announces := range f.fetched { // Pick a random peer to retrieve from, reset all others // 隨機選一個Peer發送請求,由於可能已經有不少Peer通知它這個區塊了 announce := announces[rand.Intn(len(announces))] f.forgetHash(hash) // If the block still didn't arrive, queue for completion // 若是本地沒有這個區塊,則放入到completing,建立請求 if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.completing[hash] = announce } } // Send out all block body requests // 發送全部的請求,獲取body,依然是每一個peer一個單獨協程 for peer, hashes := range request { log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes) // Create a closure of the fetch and schedule in on a new thread if f.completingHook != nil { f.completingHook(hashes) } bodyFetchMeter.Mark(int64(len(hashes))) go f.completing[hashes[0]].fetchBodies(hashes) } // Schedule the next fetch if blocks are still pending f.rescheduleComplete(completeTimer)
handleMsg()
處理該消息也是乾淨利落,直接獲取RLP格式的body,而後發送響應消息。
// handleMsg() case msg.Code == GetBlockBodiesMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } // Gather blocks until the fetch or network limits is reached var ( hash common.Hash bytes int bodies []rlp.RawValue ) // 遍歷全部請求 for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { // Retrieve the hash of the next block if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block body, stopping if enough was found // 獲取body,RLP格式 if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { bodies = append(bodies, data) bytes += len(data) } } return p.SendBlockBodiesRLP(bodies)
響應消息BlockBodiesMsg
的處理與處理獲取header的處理原理相同,先交給fetcher過濾,而後剩下的纔是downloader的。須要注意一點,響應消息裏只包含交易列表和叔塊列表。
// handleMsg() case msg.Code == BlockBodiesMsg: // A batch of block bodies arrived to one of our previous requests var request blockBodiesData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver them all to the downloader for queuing // 傳遞給downloader去處理 transactions := make([][]*types.Transaction, len(request)) uncles := make([][]*types.Header, len(request)) for i, body := range request { transactions[i] = body.Transactions uncles[i] = body.Uncles } // Filter out any explicitly requested bodies, deliver the rest to the downloader // 先讓fetcher過濾去fetcher請求的body,剩下的給downloader filter := len(transactions) > 0 || len(uncles) > 0 if filter { transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) } // 剩下的body交給downloader if len(transactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, transactions, uncles) if err != nil { log.Debug("Failed to deliver bodies", "err", err) } }
過濾函數的原理也與Header相同。
// FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. // 過去出fetcher請求的body,返回它沒有處理的,過程類型header的處理 func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) // Send the filter channel to the fetcher filter := make(chan *bodyFilterTask) select { case f.bodyFilter <- filter: case <-f.quit: return nil, nil } // Request the filtering of the body list select { case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: case <-f.quit: return nil, nil } // Retrieve the bodies remaining after filtering select { case task := <-filter: return task.transactions, task.uncles case <-f.quit: return nil, nil } }
實際過濾body的處理瞧一下,這和Header的處理是不一樣的。直接看不點:
blocks
中,它不要的繼續留在task
中。blocks
中的區塊加入到queued
,終結。case filter := <-f.bodyFilter: // Block bodies arrived, extract any explicitly requested blocks, return the rest var task *bodyFilterTask select { case task = <-filter: case <-f.quit: return } bodyFilterInMeter.Mark(int64(len(task.transactions))) blocks := []*types.Block{} // 獲取的每一個body的txs列表和uncle列表 // 遍歷每一個區塊的txs列表和uncle列表,計算hash後判斷是不是當前fetcher請求的body for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { // Match up a body to any possible completion request matched := false // 遍歷全部保存的請求,由於tx和uncle,不知道它是屬於哪一個區塊的,只能去遍歷全部的請求,一般量不大,因此遍歷沒有性能影響 for hash, announce := range f.completing { if f.queued[hash] == nil { // 把傳入的每一個塊的hash和unclehash和它請求出去的記錄進行對比,匹配則說明是fetcher請求的區塊body txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) uncleHash := types.CalcUncleHash(task.uncles[i]) if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { // Mark the body matched, reassemble if still unknown matched = true // 若是當前鏈尚未這個區塊,則收集這個區塊,合併成新區塊 if f.getBlock(hash) == nil { block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) block.ReceivedAt = task.time blocks = append(blocks, block) } else { f.forgetHash(hash) } } } } // 從task中移除fetcher請求的數據 if matched { task.transactions = append(task.transactions[:i], task.transactions[i+1:]...) task.uncles = append(task.uncles[:i], task.uncles[i+1:]...) i-- continue } } // 將剩餘的數據返回 bodyFilterOutMeter.Mark(int64(len(task.transactions))) select { case filter <- task: case <-f.quit: return } // Schedule the retrieved blocks for ordered import // 把收集的區塊加入到隊列 for _, block := range blocks { if announce := f.completing[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } } }
至此,fetcher獲取完整區塊的流程講完了,fetcher模塊中80%的代碼也都貼出來了,還有2個值得看看的函數:
forgetHash(hash common.Hash)
:用於清空指定hash指的記/狀態錄信息。forgetBlock(hash common.Hash)
:用於從隊列中移除一個區塊。最後了,再回到開始看看fetcher模塊和新區塊的傳播流程,有沒有豁然開朗。