死磕以太坊源碼分析之downloader同步c++
須要配合註釋代碼看:https://github.com/blockchainGuide/git
這篇文章篇幅較長,能看下去的是條漢子,建議收藏github
但願讀者在閱讀過程當中,指出問題,給個關注,一塊兒探討。算法
downloader
模塊的代碼位於 eth/downloader
目錄下。主要的功能代碼分別是:數據庫
downloader.go
:實現了區塊同步邏輯緩存
peer.go
:對區塊各個階段的組裝,下面的各個FetchXXX
就是很依賴這個模塊。網絡
queue.go
:對eth/peer.go
的封裝併發
statesync.go
:同步state
對象app
full 模式會在數據庫中保存全部區塊數據,同步時從遠程節點同步 header 和 body 數據,而state 和 receipt 數據則是在本地計算出來的。框架
在 full 模式下,downloader 會同步區塊的 header 和 body 數據組成一個區塊,而後經過 blockchain 模塊的 BlockChain.InsertChain
向數據庫中插入區塊。在 BlockChain.InsertChain
中,會逐個計算和驗證每一個塊的 state
和 recepit
等數據,若是一切正常就將區塊數據以及本身計算獲得的 state
、recepit
數據一塊兒寫入到數據庫中。
fast
模式下,recepit
再也不由本地計算,而是和區塊數據同樣,直接由 downloader
從其它節點中同步;state
數據並不會所有計算和下載,而是選一個較新的區塊(稱之爲 pivot
)的 state
進行下載,以這個區塊爲分界,以前的區塊是沒有 state
數據的,以後的區塊會像 full
模式下同樣在本地計算 state
。所以在 fast
模式下,同步的數據除了 header
和 body,還有 receipt
,以及 pivot
區塊的 state
。
所以 fast
模式忽略了大部分 state
數據,而且使用網絡直接同步 receipt
數據的方式替換了 full 模式下的本地計算,因此比較快。
light 模式也叫作輕模式,它只對區塊頭進行同步,而不一樣步其它的數據。
SyncMode:
圖片只是大概的描述一下,實際仍是要結合代碼,全部區塊鏈相關文章合集,https://github.com/blockchainGuide/
同時但願結識更多區塊鏈圈子的人,能夠star上面項目,持續更新
首先根據Synchronise
開始區塊同步,經過findAncestor
找到指定節點的共同祖先,並在此高度進行同步,同時開啓多個goroutine
同步不一樣的數據:header
、receipt
、body
。假如同步高度爲 100 的區塊,必須先header
同步成功同步完成才能夠喚醒body
和receipts
的同步。
而每一個部分的同步大體都是由FetchParts
來完成的,裏面包含了各個Chan
的配合,也會涉及很多的回調函數,總而言之多讀幾遍每次都會有不一樣的理解。接下來就逐步分析這些關鍵內容。
①:確保對方的TD高於咱們本身的TD
currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) pHead, pTd := peer.Head() if pTd.Cmp(td) <= 0 { return }
②:開啓downloader
的同步
pm.downloader.Synchronise(peer.id, pHead, pTd, mode)
進入函數:主要作了如下幾件事:
d.synchronise(id, head, td, mode)
:同步過程peer
。進入到d.synchronise
,走到最後一步d.syncWithPeer(p, hash, td)
真正開啓同步。
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { ... return d.syncWithPeer(p, hash, td) }
syncWithPeer大概作了如下幾件事:
findAncestor
goroutine
分別運行如下幾個函數:
接下來的文章,以及整個Downloader
模塊主要內容就是圍繞這幾個部分進行展開。
同步首要的是肯定同步區塊的區間:頂部爲遠程節點的最高區塊,底部爲兩個節點都擁有的相同區塊的最高高度(祖先區塊)。findAncestor
就是用來找祖先區塊。函數分析以下:
①:肯定本地高度和遠程節點的最高高度
var ( floor = int64(-1) // 底部 localHeight uint64 // 本地最高高度 remoteHeight = remoteHeader.Number.Uint64() // 遠程節點最高高度 ) switch d.mode { case FullSync: localHeight = d.blockchain.CurrentBlock().NumberU64() case FastSync: localHeight = d.blockchain.CurrentFastBlock().NumberU64() default: localHeight = d.lightchain.CurrentHeader().Number.Uint64() }
②:計算同步的高度區間和間隔
from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
from
::表示從哪一個高度開始獲取區塊count
:表示從遠程節點獲取多少個區塊skip
:表示間隔,好比skip
爲 2 ,獲取第一個高度爲 5,則第二個就是 8max
:表示最大高度③:發送獲取header
的請求
go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)
④:處理上面請求接收到的header
:case packet := <-d.headerCh
header
數量不爲空headers
的高度是咱們所請求的//----① if packet.PeerId() != p.id { log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } //-----② headers := packet.(*headerPack).headers if len(headers) == 0 { p.log.Warn("Empty head header set") return 0 } //-----③ for i, header := range headers { expectNumber := from + int64(i)*int64(skip+1) if number := header.Number.Int64(); number != expectNumber { // 驗證這些返回的header是不是咱們上面請求的headers p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) return 0, errInvalidChain } } //-----④ // 檢查是否找到共同祖先 finished = true //注意這裏是從headers最後一個元素開始查找,也就是高度最高的區塊。 for i := len(headers) - 1; i >= 0; i-- { // 跳過不在咱們請求的高度區間內的區塊 if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max { continue } // //檢查咱們本地是否已經有某個區塊了,若是有就算是找到了共同祖先, //並將共同祖先的哈希和高度設置在number和hash變量中。 h := headers[i].Hash() n := headers[i].Number.Uint64()
⑤:若是經過固定間隔法找到了共同祖先則返回祖先,會對其高度與 floor
變量進行驗證, floor
變量表明的是共同祖先的高度的最小值,若是找到共同祖先的高度比這個值還小,就認爲是兩個節點之間分叉太大了,再也不容許進行同步。若是一切正常,就返回找到的共同祖先的高度 number
變量。
if hash != (common.Hash{}) { if int64(number) <= floor { return 0, errInvalidAncestor } return number, nil }
⑥:若是固定間隔法沒有找到祖先則經過二分法來查找祖先,這部分能夠思想跟二分法算法相似,有興趣的能夠細看。
queue
對象和Downloader
對象是相互做用的,Downloader
的不少功能離不開他,接下來咱們介紹一下這部份內容,可是本節,能夠先行跳過,等到了閱讀下面的關於Queue
調用的一些函數部分再回過來閱讀這部分講解。
type queue struct { mode SyncMode // 同步模式 // header處理相關 headerHead common.Hash //最後一個排隊的標頭的哈希值以驗證順序 headerTaskPool map[uint64]*types.Header //待處理的標頭檢索任務,將起始索引映射到框架標頭 headerTaskQueue *prque.Prque //骨架索引的優先級隊列,以獲取用於的填充標頭 headerPeerMiss map[string]map[uint64]struct{} //已知不可用的對等頭批處理集 headerPendPool map[string]*fetchRequest //當前掛起的頭檢索操做 headerResults []*types.Header //結果緩存累積完成的頭 headerProced int //從結果中拿出來已經處理的header headerContCh chan bool //header下載完成時通知的頻道 blockTaskPool map[common.Hash]*types.Header //待處理的塊(body)檢索任務,將哈希映射到header blockTaskQueue *prque.Prque //標頭的優先級隊列,以用於獲取塊(bodies) blockPendPool map[string]*fetchRequest //當前的正在處理的塊(body)檢索操做 blockDonePool map[common.Hash]struct{} //已經完成的塊(body) receiptTaskPool map[common.Hash]*types.Header //待處理的收據檢索任務,將哈希映射到header receiptTaskQueue *prque.Prque //標頭的優先級隊列,以用於獲取收據 receiptPendPool map[string]*fetchRequest //當前的正在處理的收據檢索操做 receiptDonePool map[common.Hash]struct{} //已經完成的收據 resultCache []*fetchResult //下載但還沒有交付獲取結果 resultOffset uint64 //區塊鏈中第一個緩存的獲取結果的偏移量 resultSize common.StorageSize // 塊的近似大小 lock *sync.Mutex active *sync.Cond closed bool }
ScheduleSkeleton
:將一批header
檢索任務添加到隊列中,以填充已檢索的header skeleton
Schedule
:用來準備對一些 body
和 receipt
數據的下載pending
pending
表示待檢索的XXX請求的數量,包括了:PendingHeaders
、PendingBlocks
、PendingReceipts
,分別都是對應取XXXTaskQueue
的長度。
InFlight
InFlight
表示是否有正在獲取XXX的請求,包括:InFlightHeaders
、InFlightBlocks
、InFlightReceipts
,都是經過判斷len(q.receiptPendPool) > 0
來確認。
ShouldThrottle
ShouldThrottle
表示檢查是否應該限制下載XXX,包括:ShouldThrottleBlocks
、ShouldThrottleReceipts
,主要是爲了防止下載過程當中本地內存佔用過大。
Reserve
Reserve
經過構造一個 fetchRequest
結構並返回,向調用者提供指定數量的待下載的數據的信息(queue
內部會將這些數據標記爲「正在下載」)。調用者使用返回的 fetchRequest
數據向遠程節點發起新的獲取數據的請求。包括:ReserveHeaders
、ReserveBodies
、ReserveReceipts
。
Cancel
Cance
用來撤消對 fetchRequest
結構中的數據的下載(queue
內部會將這些數據從新從「正在下載」的狀態更改成「等待下載」)。包括:CancelHeaders
、CancelBodies
、CancelReceipts
。
expire
expire
檢查正在執行中的請求是否超過了超時限制,包括:ExpireHeaders
、ExpireBodies
、ExpireReceipts
。
Deliver
當有數據下載成功時,調用者會使用 deliver
功能用來通知 queue
對象。包括:DeliverHeaders
、DeliverBodies
、DeliverReceipts
。
RetrieveHeaders
skeleton
完成後,queue.RetrieveHeaders
用來獲取整個 skeleton
中的全部 header
。Results
queue.Results
用來獲取當前的 header
、body
和 receipt
(只在 fast
模式下) 都已下載成功的區塊(並將這些區塊從 queue
內部移除)queue.ScheduleSkeleton主要是爲了填充skeleton,它的參數是要下載區塊的起始高度和全部 skeleton
區塊頭,最核心的內容則是下面這段循環:
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { ...... for i, header := range skeleton { index := from + uint64(i*y) q.headerTaskPool[index] = header q.headerTaskQueue.Push(index, -int64(index)) } }
假設已肯定須要下載的區塊高度區間是從 10 到 46,MaxHeaderFetch
的值爲 10,那麼這個高度區塊就會被分紅 3 組:10 - 19,20 - 29,30 - 39,而 skeleton 則分別由高度爲 1九、2九、39 的區塊頭組成。循環中的 index
變量其實是每一組區塊中的第一個區塊的高度(好比 十、20、30),queue.headerTaskPool
其實是一個每一組區塊中第一個區塊的高度到最後一個區塊的 header 的映射
headerTaskPool = { 10: headerOf_19, 20: headerOf_20, 30: headerOf_39, }
reserve
用來獲取可下載的數據。
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil }
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { if _, ok := q.headerPendPool[p.id]; ok { return nil } //① ... send, skip := uint64(0), []uint64{} for send == 0 && !q.headerTaskQueue.Empty() { from, _ := q.headerTaskQueue.Pop() if q.headerPeerMiss[p.id] != nil { if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { skip = append(skip, from.(uint64)) continue } } send = from.(uint64) // ② } ... for _, from := range skip { q.headerTaskQueue.Push(from, -int64(from)) } // ③ ... request := &fetchRequest{ Peer: p, From: send, Time: time.Now(), } q.headerPendPool[p.id] = request // ④ }
①:根據headerPendPool
來判斷遠程節點是否正在下載數據信息。
②:從headerTaskQueue
取出值做爲本次請求的起始高度,賦值給send
變量,在這個過程當中會排除headerPeerMiss所記錄的節點下載數據失敗的信息。
③:將失敗的任務再從新寫回task queue
④:利用send
變量構造fetchRequest
結構,此結構是用來做爲FetchHeaders
來使用的:
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
至此,ReserveHeaders
會從任務隊列裏選擇最小的起始高度並構造fetchRequest
傳遞給fetch
獲取數據。
deliver = func(packet dataPack) (int, error) { pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) }
①:若是發現下載數據的節點沒有在 queue.headerPendPool
中,就直接返回錯誤;不然就繼續處理,並將節點記錄從 queue.headerPendPool
中刪除。
request := q.headerPendPool[id] if request == nil { return 0, errNoFetchesPending } headerReqTimer.UpdateSince(request.Time) delete(q.headerPendPool, id)
②:驗證headers
包括三方面驗證:
if accepted { //檢查起始區塊的高度和哈希 if headers[0].Number.Uint64() != request.From { ... accepted = false } else if headers[len(headers)-1].Hash() != target { ... accepted = false } } if accepted { for i, header := range headers[1:] { hash := header.Hash() // 檢查高度的鏈接性 if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { ... } if headers[i].Hash() != header.ParentHash { // 檢查哈希的鏈接性 ... } } }
③: 將無效數據存入headerPeerMiss
,並將這組區塊起始高度從新放入headerTaskQueue
if !accepted { ... miss := q.headerPeerMiss[id] if miss == nil { q.headerPeerMiss[id] = make(map[uint64]struct{}) miss = q.headerPeerMiss[id] } miss[request.From] = struct{}{} q.headerTaskQueue.Push(request.From, -int64(request.From)) return 0, errors.New("delivery not accepted") }
④:保存數據,並通知headerProcCh
處理新的header
if ready > 0 { process := make([]*types.Header, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { case headerProcCh <- process: q.headerProced += len(process) default: } }
⑤:發送消息給.headerContCh
,通知skeleton
都被下載完了
if len(q.headerTaskPool) == 0 { q.headerContCh <- false }
DeliverHeaders
會對數據進行檢驗和保存,併發送 channel 消息給 Downloader.processHeaders
和 Downloader.fetchParts
的 wakeCh
參數。
processHeaders
在處理header
數據的時候,會調用queue.Schedule
爲下載 body
和 receipt
做準備。
inserts := d.queue.Schedule(chunk, origin)
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { inserts := make([]*types.Header, 0, len(headers)) for _, header := range headers { //校驗 ... q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) if q.mode == FastSync { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } inserts = append(inserts, header) q.headerHead = hash from++ } return inserts }
這個函數主要就是將信息寫入到body和receipt隊列,等待調度。
在 queue
中準備好了 body 和 receipt 相關的數據, processHeaders
最後一段,是喚醒下載Bodyies和Receipts的關鍵代碼,會通知 fetchBodies
和 fetchReceipts
能夠對各自的數據進行下載了。
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } }
而fetchXXX
會調用fetchParts
,邏輯相似上面的的,reserve
最終則會調用reserveHeaders
,deliver
最終調用的是 queue.deliver
.
先來分析reserveHeaders
:
①:若是沒有可處理的任務,直接返回
if taskQueue.Empty() { return nil, false, nil }
②:若是參數給定的節點正在下載數據,返回
if _, ok := pendPool[p.id]; ok { return nil, false, nil }
③:計算 queue 對象中的緩存空間還能夠容納多少條數據
space := q.resultSlots(pendPool, donePool)
④:從 「task queue」 中依次取出任務進行處理
主要實現如下功能:
queue.resultCache
中的位置,而後填充 queue.resultCache
中相應位置的元素注意:resultCache
字段用來記錄全部正在被處理的數據的處理結果,它的元素類型是 fetchResult
。它的 Pending
字段表明當前區塊還有幾類數據須要下載。這裏須要下載的數據最多有兩類:body 和 receipt,full
模式下只須要下載 body
數據,而 fast
模式要多下載一個 receipt
數據。
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { header := taskQueue.PopItem().(*types.Header) hash := header.Hash() index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { .... } if q.resultCache[index] == nil { components := 1 if q.mode == FastSync { components = 2 } q.resultCache[index] = &fetchResult{ Pending: components, Hash: hash, Header: header, } } if isNoop(header) { donePool[hash] = struct{}{} delete(taskPool, hash) space, proc = space-1, proc-1 q.resultCache[index].Pending-- progress = true continue } if p.Lacks(hash) { skip = append(skip, header) } else { send = append(send, header) } }
最後就是構造 fetchRequest
結構並返回。
body
或 receipt
數據都已經經過 reserve
操做構造了 fetchRequest
結構並傳給 fetch
,接下來就是等待數據的到達,數據下載成功後,會調用 queue
對象的 deliver
方法進行傳遞,包括 queue.DeliverBodies
和 queue.DeliverReceipts
。這兩個方法都以不一樣的參數調用了 queue.deliver
方法:
①:若是下載的數據數量爲 0,則把全部此節點這次下載的數據標記爲「缺失」
if results == 0 { for _, header := range request.Headers { request.Peer.MarkLacking(header.Hash()) } }
②:循環處理數據,經過調用reconstruct
填充 resultCache[index]
中的相應的字段
for i, header := range request.Headers { ... if err := reconstruct(header, i, q.resultCache[index]); err != nil { failure = err break } }
③:驗證resultCache
中的數據,其對應的 request.Headers
中的 header
都應爲 nil,若不是則說明驗證未經過,須要假如到task queue從新下載
for _, header := range request.Headers { if header != nil { taskQueue.Push(header, -int64(header.Number.Uint64())) } }
④:若是有數據被驗證經過且寫入 queue.resultCache
中了(accepted
> 0),發送 queue.active
消息。Results
會等待這這個信號。
當(header、body、receipt)都下載完,就要將區塊寫入到數據庫了,queue.Results
就是用來返回全部目前已經下載完成的數據,它在 Downloader.processFullSyncContent
和 Downloader.processFastSyncContent
中被調用。代碼比較簡單就很少說了。
到此爲止queue
對象就分析的差很少了。
同步headers
是是由函數fetchHeaders
來完成的。
fetchHeaders
的大體思想:
同步header
的數據會被填充到skeleton
,每次從遠程節點獲取區塊數據最大爲MaxHeaderFetch
(192),因此要獲取的區塊數據若是大於192 ,會被分紅組,每組MaxHeaderFetch
,剩餘的不足192個的不會填充進skeleton
,具體步驟以下圖所示:
此種方式能夠避免從同一節點下載過多錯誤數據,若是咱們鏈接到了一個惡意節點,它能夠創造一個鏈條很長且TD
值也很是高的區塊鏈數據。若是咱們的區塊從 0 開始所有從它那同步,也就下載了一些根本不被別人認可的數據。若是我只從它那同步 MaxHeaderFetch
個區塊,而後發現這些區塊沒法正確填充我以前的 skeleton
(多是 skeleton
的數據錯了,或者用來填充 skeleton
的數據錯了),就會丟掉這些數據。
接下來查看下代碼如何實現:
①:發起獲取header
的請求
若是是下載skeleton
,則會從高度 from+MaxHeaderFetch-1
開始(包括),每隔 MaxHeaderFetch-1
的高度請求一個 header
,最多請求 MaxSkeletonSize
個。若是不是的話,則要獲取完整的headers
。
②:等待並處理headerCh
中的header
數據
2.1 確保遠程節點正在返回咱們須要填充skeleton
所需的header
if packet.PeerId() != p.id { log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break }
2.2 若是skeleton
已經下載完畢,則須要繼續填充skeleton
if packet.Items() == 0 && skeleton { skeleton = false getHeaders(from) continue }
2.3 整個skeleton
填充完成,而且沒有要獲取的header
了,要通知headerProcCh
所有完成
if packet.Items() == 0 { //下載pivot時不要停止標頭的提取 if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCanceled } } //完成Pivot操做(或不進行快速同步),而且沒有頭文件,終止該過程 p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil case <-d.cancelCh: return errCanceled } }
2.4 當header
有數據而且是在獲取skeleton
的時候,調用fillHeaderSkeleton
填充skeleton
if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] from += uint64(proced) }
2.5 若是當前處理的不是 skeleton
,代表區塊同步得差很少了,處理尾部的一些區塊
判斷本地的主鏈高度與新收到的 header 的最高高度的高度差是否在 reorgProtThreshold
之內,若是不是,就將高度最高的 reorgProtHeaderDelay
個 header 丟掉。
if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() { delay := reorgProtHeaderDelay if delay > n { delay = n } headers = headers[:n-delay] }
2.6 若是還有 header
未處理,發給 headerProcCh
進行處理,Downloader.processHeaders
會等待這個 channel 的消息並進行處理;
if len(headers) > 0 { ... select { case d.headerProcCh <- headers: case <-d.cancelCh: return errCanceled } from += uint64(len(headers)) getHeaders(from) }
2.7 若是沒有發送標頭,或者全部標頭等待 fsHeaderContCheck
秒,再次調用 getHeaders
請求區塊
p.log.Trace("All headers delayed, waiting") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCanceled }
這段代碼後來才加上的,其 commit 的記錄在這裏,而 「pull request」 在這裏。從 「pull request」 中做者的解釋咱們能夠了解這段代碼的邏輯和功能:這個修改主要是爲了解決常常出現的 「invalid hash chain」 錯誤,出現這個錯誤的緣由是由於在咱們上一次從遠程節點獲取到一些區塊並將它們加入到本地的主鏈的過程當中,遠程節點發生了 reorg 操做(參見這篇文章裏關於「主鏈與側鏈」的介紹 );當咱們再次根據高度請求新的區塊時,對方返回給咱們的是它的新的主鏈上的區塊,而咱們沒有這個鏈上的歷史區塊,所以在本地寫入區塊時就會返回 「invalid hash chain」 錯誤。
要想發生 「reorg」 操做,就須要有新區塊加入。在以太坊主網上,新產生一個區塊的間隔是 10 秒到 20 秒左右。通常狀況下,若是僅僅是區塊數據,它的同步速度仍是很快的,每次下載也有最大數量的限制。因此在新產生一個區塊的這段時間裏,足夠同步完成一組區塊數據而對方節點不會發生 「reorg」 操做。可是注意剛纔說的「僅僅是區塊數據」的同步較快,state 數據的同步就很是慢了。簡單來講在完成同步以前可能會有多個 「pivot」 區塊,這些區塊的 state 數據會從網絡上下載,這就大大拖慢了整個區塊的同步速度,使得本地在同步一組區塊的同時對方發生 「reorg」 操做的機率大大增長。
做者認爲這種狀況下發生的 「reorg」 操做是由新產生的區塊的競爭引發的,因此最新的幾個區塊是「不穩定的」,若是本次同步的區塊數量較多(也就是咱們同步時消耗的時間比較長)(在這裏「本次同步的區數數量較多」的表現是新收到的區塊的最高高度與本地數據庫中的最高高度的差距大於 reorgProtThreshold
),那麼在同步時能夠先避免同步最新區塊,這就是 reorgProtThreshold
和 reorgProtHeaderDelay
這個變量的由來。
至此,Downloader.fetchHeaders
方法就結束了,全部的區塊頭也就同步完成了。在上面咱們提到填充skeleton
的時候,是由fillHeaderSkeleton
函數來完成,接下來就要細講填充skeleton
的細節。
首先咱們知道以太坊在同步區塊時,先肯定要下載的區塊的高度區間,而後將這個區間按 MaxHeaderFetch
切分紅不少組,每一組的最後一個區塊組成了 「skeleton」(最後一組不滿 MaxHeaderFetch
個區塊不算做一組)。不清楚的能夠查看上面的圖。
①:將一批header
檢索任務添加到隊列中,以填充skeleton
。
這個函數參照上面queue詳解的分析
func (q queue) ScheduleSkeleton(from uint64, skeleton []types.Header) {}
②:調用fetchParts
獲取headers
數據
fetchParts
是很核心的函數,下面的Fetchbodies
和FetchReceipts
都會調用。先來大體看一下fetchParts
的結構:
func (d *Downloader) fetchParts(...) error { ... for { select { case <-d.cancelCh: case packet := <-deliveryCh: case cont := <-wakeCh: case <-ticker.C: case <-update: ... } }
簡化下來就是這 5 個channel
在處理,前面 4 個channel
負責循環等待消息,update
用來等待其餘 4 個channel
的通知來處理邏輯,先分開分析一個個的channel
。
2.1 deliveryCh 傳遞下載的數據
deliveryCh
做用就是傳遞下載的數據,當有數據被真正下載下來時,就會給這個 channel
發消息將數據傳遞過來。這個 channel 對應的分別是:d.headerCh
、d.bodyCh
、d.receiptCh
,而這三個 channel
分別在如下三個方法中被寫入數據:DeliverHeaders
、DeliverBodies
、DeliverReceipts
。 看下deliveryCh
如何處理數據:
case packet := <-deliveryCh: if peer := d.peers.Peer(packet.PeerId()); peer != nil { accepted, err := deliver(packet)//傳遞接收到的數據塊並檢查鏈有效性 if err == errInvalidChain { return err } if err != errStaleDelivery { setIdle(peer, accepted) } switch { case err == nil && packet.Items() == 0: ... case err == nil: ... } } select { case update <- struct{}{}: default: }
收到下載數據後判斷節點是否有效,若是節點沒有被移除,則會經過deliver
傳遞接收到的下載數據。若是沒有任何錯誤,則通知update
處理。
要注意deliver
是一個回調函數,它調用了 queue 對象的 Deliver 方法:queue.DeliverHeaders
、queue.DeliverBodies
、queue.DeliverReceipts
,在收到下載數據就會調用此回調函數(queue相關函數分析參照queue詳解部分)。
在上面處理錯誤部分,有一個setIdle
函數,它也是回調函數,其實現都是調用了 peerConnection
對象的相關方法:SetHeadersIdle
、SetBodiesIdle
、SetReceiptsIdle
。它這個函數是指某些節點針對某類數據是空閒的,好比header
、bodies
、receipts
,若是須要下載這幾類數據,就能夠從空閒的節點下載這些數據。
2.2 wakeCh
喚醒fetchParts
,下載新數據或下載已完成
case cont := <-wakeCh: if !cont { finished = true } select { case update <- struct{}{}: default: }
首先咱們經過調用fetchParts傳遞的參數知道,wakeCh
的值實際上是 queue.headerContCh
。在 queue.DeliverHeaders
中發現全部須要下戴的 header 都下載完成了時,纔會發送 false 給這個 channel。fetchParts
在收到這個消息時,就知道沒有 header 須要下載了。代碼以下:
func (q *queue) DeliverHeaders(......) (int, error) { ...... if len(q.headerTaskPool) == 0 { q.headerContCh <- false } ...... }
一樣如此,body
和receipt
則是bodyWakeCh
和receiptWakeCh
,在 processHeaders
中,若是全部 header
已經下載完成了,那麼發送 false
給這兩個 channel
,通知它們沒有新的 header
了。 body
和 receipt
的下載依賴於 header
,須要 header
先下載完成才能下載,因此對於下戴 body
或 receipt
的 fetchParts
來講,收到這個 wakeCh
就表明不會再有通知讓本身下載數據了.
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { for { select { case headers := <-d.headerProcCh: if len(headers) == 0 { for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } } ... } ... for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } } } } }
2.3 ticker 負責週期性的激活 update
進行消息處理
case <-ticker.C: select { case update <- struct{}{}: default: }
2.4 update
(處理此前幾個channel
的數據)(重要)
2.4.1 判斷是否有效節點,並獲取超時數據的信息
獲取超時數據的節點ID和數據數量,若是大於兩個的話,就將這個節點設置爲空閒狀態(setIdle
),小於兩個的話直接斷開節點鏈接。
expire
是一個回調函數,會返回當前全部的超時數據信息。這個函數的實際實現都是調用了 queue
對象的 Expire
方法:ExpireHeaders
、ExpireBodies
、ExpireReceipts
,此函數會統計當前正在下載的數據中,起始時間與當前時間的差距超過給定閾值(downloader.requestTTL
方法的返回值)的數據,並將其返回。
if d.peers.Len() == 0 { return errNoPeers } for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { if fails > 2 { ... setIdle(peer, 0) } else { ... if d.dropPeer == nil { } else { d.dropPeer(pid) .... } } }
2.4.2 處理完超時數據,判斷是否還有下載的數據
若是沒有其餘可下載的內容,請等待或終止,這裏pending()
和inFlight()
都是回調函數,pending
分別對應了queue.PendingHeaders
、queue.PendingBlocks
、queue.PendingReceipts
,用來返回各自要下載的任務數量。inFlight()
分別對應了queue.InFlightHeaders
、queue.InFlightBlocks
、queue.InFlightReceipts
,用來返回正在下載的數據數量。
if pending() == 0 { if !inFlight() && finished { ... return nil } break }
2.4.3 使用空閒節點,調用fetch
函數發送數據請求
Idle()
回調函數在上面已經提過了,throttle()
回調函數則分別對queue.ShouldThrottleBlocks
、queue.ShouldThrottleReceipts
,用來表示是否應該下載bodies
或者receipts
。
reserve
函數分別對應queue.ReserveHeaders
、queue.ReserveBodies
、queue.ReserveReceipts
,用來從從下載任務中選取一些能夠下載的任務,並構造一個 fetchRequest
結構。它還返回一個 process
變量,標記着是否有空的數據正在被處理。好比有可能某區塊中未包含任何一條交易,所以它的 body
和 receipt
都是空的,這種數據實際上是不須要下載的。在 queue
對象的 Reserve
方法中,會對這種狀況進行識別。若是遇到空的數據,這些數據會被直接標記爲下載成功。在方法返回時,就將是否發生過「直接標記爲下載成功」的狀況返回。
capacity
回調函數分別對應peerConnection.HeaderCapacity
、peerConnection.BlockCapacity
、peerConnection.ReceiptCapacity
,用來決定下載須要請求數據的個數。
fetch
回調函數分別對應peer.FetchHeaders
、peer.Fetchbodies
、peer.FetchReceipts
,用來發送獲取各種數據的請求。
progressed, throttled, running := false, false, inFlight() idles, total := idle() for _, peer := range idles { if throttle() { ... } if pending() == 0 { break } request, progress, err := reserve(peer, capacity(peer)) if err != nil { return err } if progress { progressed = true } if request == nil { continue } if request.From > 0 { ... } ... if err := fetch(peer, request); err != nil { ... } if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable }
簡單來歸納這段代碼就是:使用空閒節點下載數據,判斷是否須要暫停,或者數據是否已經下載完成;以後選取數據進行下載;最後,若是沒有遇到空塊須要下載、且沒有暫停下載和全部有效節點都空閒和確實有數據須要下載,但下載沒有運行起來,就返回 errPeersUnavailable
錯誤。
到此爲止fetchParts
函數就分析的差很少了。裏面涉及的跟queue.go
相關的一些函數都在queue詳解小節裏介紹了。
經過headerProcCh
接收header
數據,並處理的過程是在processHeaders
函數中完成的。整個處理過程集中在:case headers := <-d.headerProcCh中
:
①:若是headers
的長度爲0 ,則會有如下操做:
1.1 通知全部人header
已經處理完畢
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } }
1.2 若沒有檢索到任何header
,說明他們的TD
小於咱們的,或者已經經過咱們的fetcher
模塊進行了同步。
if d.mode != LightSync { head := d.blockchain.CurrentBlock() if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { return errStallingPeer } }
1.3 若是是fast
或者light
同步,確保傳遞了header
if d.mode == FastSync || d.mode == LightSync { head := d.lightchain.CurrentHeader() if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { return errStallingPeer } }
②:若是headers
的長度大於 0
2.1 若是是fast或者light 同步,調用ightchain.InsertHeaderChain()寫入header
到leveldb
數據庫;
if d.mode == FastSync || d.mode == LightSync { .... d.lightchain.InsertHeaderChain(chunk, frequency); .... }
2.2 若是是fast
或者full sync
模式,則調用 d.queue.Schedule進行內容(body和receipt)檢索。
if d.mode == FullSync || d.mode == FastSync { ... inserts := d.queue.Schedule(chunk, origin) ... }
③:若是找到更新的塊號,則要發信號通知新任務
if d.syncStatsChainHeight < origin { d.syncStatsChainHeight = origin - 1 } for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } }
到此處理Headers
的分析就完成了。
同步bodies
則是由fetchBodies
函數完成的。
同步bodies的過程跟同步header相似,大體講下步驟:
fetchParts
ReserveBodies
()從bodyTaskPool
中取出要同步的body
;fetch
,也就是調用這裏的FetchBodies
從節點獲取body
,發送GetBlockBodiesMsg
消息;bodyCh
的數據後,調用deliver
函數,將Transactions和Uncles
寫入resultCache
。同步receipts
的過程跟同步header
相似,大體講下步驟:
fetchParts
()ReserveBodies
()從ReceiptTaskPool
中取出要同步的Receipt
FetchReceipts
從節點獲取receipts
,發送GetReceiptsMsg
消息;receiptCh
的數據後,調用deliver
函數,將Receipts
寫入resultCache
。這裏咱們講兩種模式下的狀態同步:
processFullSyncContent
,full
模式下Receipts
沒有緩存到resultCache
中,直接先從緩存中取出body
數據,而後執行交易生成狀態,最後寫入區塊鏈。processFastSyncContent
:fast模式的Receipts、Transaction、Uncles都在resultCache中,因此還須要下載"state",進行校驗,再寫入區塊鏈。接下來大體的討論下這兩種方式。
func (d *Downloader) processFullSyncContent() error { for { results := d.queue.Results(true) ... if err := d.importBlockResults(results); err != nil ... } }
func (d *Downloader) importBlockResults(results []*fetchResult) error { ... select { ... blocks := make([]*types.Block, len(results)) for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } if index, err := d.blockchain.InsertChain(blocks); err != nil { .... }
直接從result
中獲取數據並生成block
,直接插入區塊鏈中,就結束了。
fast模式同步狀態內容比較多,大體也就以下幾部分,咱們開始簡單分析如下。
①:下載最新的區塊狀態
sync := d.syncState(latest.Root)
咱們直接用一張圖來表示整個大體流程:
具體的代碼讀者本身翻閱,大體就是這麼個簡單過程。
②:計算出pivot塊
pivot
爲latestHeight - 64
,調用splitAroundPivot
()方法以pivot爲中心,將results
分爲三個部分:beforeP
,P
,afterP
;
pivot := uint64(0) if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { pivot = height - uint64(fsMinFullBlocks) }
P, beforeP, afterP := splitAroundPivot(pivot, results)
③: 對beforeP
的部分調用commitFastSyncData
,將body
和receipt
都寫入區塊鏈
d.commitFastSyncData(beforeP, sync);
④:對P的部分更新狀態信息爲P block
的狀態,把P對應的result(包含body和receipt)調用commitPivotBlock插入本地區塊鏈中,並調用FastSyncCommitHead記錄這個pivot的hash值,存在downloader中,標記爲快速同步的最後一個區塊hash值;
if err := d.commitPivotBlock(P); err != nil { return err }
⑤:對afterP
調用d.importBlockResults
,將body
插入區塊鏈,而不插入receipt
。由於是最後 64 個區塊,因此此時數據庫中只有header
和body
,沒有receipt
和狀態,要經過fullSync
模式進行最後的同步。
if err := d.importBlockResults(afterP); err != nil { return err }
到此爲止整個Downloader同步完成了。
https://github.com/ethereum/go-ethereum/pull/1889
https://yangzhe.me/2019/05/09/ethereum-downloader/#fetchparts