死磕以太坊源碼分析之downloader同步

死磕以太坊源碼分析之downloader同步c++

須要配合註釋代碼看https://github.com/blockchainGuide/git

這篇文章篇幅較長,能看下去的是條漢子,建議收藏github

但願讀者在閱讀過程當中,指出問題,給個關注,一塊兒探討。算法

概覽

downloader 模塊的代碼位於 eth/downloader 目錄下。主要的功能代碼分別是:數據庫

  • downloader.go :實現了區塊同步邏輯緩存

  • peer.go :對區塊各個階段的組裝,下面的各個FetchXXX 就是很依賴這個模塊。網絡

  • queue.go :對eth/peer.go的封裝併發

  • statesync.go :同步state對象app

同步模式

full sync

full 模式會在數據庫中保存全部區塊數據,同步時從遠程節點同步 header 和 body 數據,而state 和 receipt 數據則是在本地計算出來的。框架

在 full 模式下,downloader 會同步區塊的 header 和 body 數據組成一個區塊,而後經過 blockchain 模塊的 BlockChain.InsertChain 向數據庫中插入區塊。在 BlockChain.InsertChain 中,會逐個計算和驗證每一個塊的 staterecepit 等數據,若是一切正常就將區塊數據以及本身計算獲得的 staterecepit 數據一塊兒寫入到數據庫中。

fast sync

fast 模式下,recepit 再也不由本地計算,而是和區塊數據同樣,直接由 downloader 從其它節點中同步;state 數據並不會所有計算和下載,而是選一個較新的區塊(稱之爲 pivot)的 state 進行下載,以這個區塊爲分界,以前的區塊是沒有 state 數據的,以後的區塊會像 full 模式下同樣在本地計算 state。所以在 fast 模式下,同步的數據除了 header 和 body,還有 receipt,以及 pivot 區塊的 state

所以 fast 模式忽略了大部分 state 數據,而且使用網絡直接同步 receipt 數據的方式替換了 full 模式下的本地計算,因此比較快。

light sync

light 模式也叫作輕模式,它只對區塊頭進行同步,而不一樣步其它的數據。

SyncMode:

  • FullSync:從完整區塊同步整個區塊鏈歷史
  • FastSync:快速下載標題,僅在鏈頭處徹底同步
  • LightSync:僅下載標題,而後終止

區塊下載流程

圖片只是大概的描述一下,實際仍是要結合代碼,全部區塊鏈相關文章合集https://github.com/blockchainGuide/

同時但願結識更多區塊鏈圈子的人,能夠star上面項目,持續更新

image-20201222221031797

首先根據Synchronise開始區塊同步,經過findAncestor找到指定節點的共同祖先,並在此高度進行同步,同時開啓多個goroutine同步不一樣的數據:headerreceiptbody。假如同步高度爲 100 的區塊,必須先header同步成功同步完成才能夠喚醒bodyreceipts的同步。

而每一個部分的同步大體都是由FetchParts來完成的,裏面包含了各個Chan的配合,也會涉及很多的回調函數,總而言之多讀幾遍每次都會有不一樣的理解。接下來就逐步分析這些關鍵內容。


synchronise

①:確保對方的TD高於咱們本身的TD

currentBlock := pm.blockchain.CurrentBlock()
	td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
	pHead, pTd := peer.Head()
	if pTd.Cmp(td) <= 0 {
		return
	}

②:開啓downloader的同步

pm.downloader.Synchronise(peer.id, pHead, pTd, mode)

進入函數:主要作了如下幾件事:

  1. d.synchronise(id, head, td, mode) :同步過程
  2. 錯誤日誌輸出, 並刪除此peer

進入到d.synchronise,走到最後一步d.syncWithPeer(p, hash, td)真正開啓同步。

func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  ...
  return d.syncWithPeer(p, hash, td)
}

syncWithPeer大概作了如下幾件事:

  1. 查找祖先findAncestor
  2. 開啓單獨goroutine分別運行如下幾個函數:
    • fetchHeaders
    • processHeaders
    • fetchbodies
    • fetchReceipts
    • processFastSyncContent
    • processFullSyncContent

接下來的文章,以及整個Downloader模塊主要內容就是圍繞這幾個部分進行展開。


findAncestor

同步首要的是肯定同步區塊的區間:頂部爲遠程節點的最高區塊,底部爲兩個節點都擁有的相同區塊的最高高度(祖先區塊)。findAncestor就是用來找祖先區塊。函數分析以下:

①:肯定本地高度和遠程節點的最高高度

var (
		floor        = int64(-1) // 底部
		localHeight  uint64  // 本地最高高度
		remoteHeight = remoteHeader.Number.Uint64() // 遠程節點最高高度
	)
switch d.mode {
	case FullSync:
		localHeight = d.blockchain.CurrentBlock().NumberU64()
	case FastSync:
		localHeight = d.blockchain.CurrentFastBlock().NumberU64()
	default:
		localHeight = d.lightchain.CurrentHeader().Number.Uint64()
	}

②:計算同步的高度區間和間隔

from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
  • from::表示從哪一個高度開始獲取區塊
  • count:表示從遠程節點獲取多少個區塊
  • skip:表示間隔,好比skip 爲 2 ,獲取第一個高度爲 5,則第二個就是 8
  • max:表示最大高度

③:發送獲取header的請求

go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)

④:處理上面請求接收到的header :case packet := <-d.headerCh

  1. 丟棄掉不是來自咱們請求節的內容
  2. 確保返回的header數量不爲空
  3. 驗證返回的headers的高度是咱們所請求的
  4. 檢查是否找到共同祖先
//----①
if packet.PeerId() != p.id {
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
				break
			}
//-----②
headers := packet.(*headerPack).headers
			if len(headers) == 0 {
				p.log.Warn("Empty head header set")
        return 0
      }
//-----③
for i, header := range headers {
				expectNumber := from + int64(i)*int64(skip+1)
				if number := header.Number.Int64(); number != expectNumber { // 驗證這些返回的header是不是咱們上面請求的headers
					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
					return 0, errInvalidChain
				}
			}
//-----④
// 檢查是否找到共同祖先
			finished = true
			//注意這裏是從headers最後一個元素開始查找,也就是高度最高的區塊。
			for i := len(headers) - 1; i >= 0; i-- {
				// 跳過不在咱們請求的高度區間內的區塊
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {
					continue
				}
				// //檢查咱們本地是否已經有某個區塊了,若是有就算是找到了共同祖先,
				//並將共同祖先的哈希和高度設置在number和hash變量中。
				h := headers[i].Hash()
				n := headers[i].Number.Uint64()

⑤:若是經過固定間隔法找到了共同祖先則返回祖先,會對其高度與 floor 變量進行驗證, floor 變量表明的是共同祖先的高度的最小值,若是找到共同祖先的高度比這個值還小,就認爲是兩個節點之間分叉太大了,再也不容許進行同步。若是一切正常,就返回找到的共同祖先的高度 number 變量。

if hash != (common.Hash{}) {
        if int64(number) <= floor {
            return 0, errInvalidAncestor
        }
        return number, nil
    }

⑥:若是固定間隔法沒有找到祖先則經過二分法來查找祖先,這部分能夠思想跟二分法算法相似,有興趣的能夠細看。


queue詳解

queue對象和Downloader對象是相互做用的,Downloader的不少功能離不開他,接下來咱們介紹一下這部份內容,可是本節,能夠先行跳過,等到了閱讀下面的關於Queue調用的一些函數部分再回過來閱讀這部分講解。

queue結構體

type queue struct {
  mode SyncMode // 同步模式
  
  // header處理相關
  headerHead      common.Hash   //最後一個排隊的標頭的哈希值以驗證順序
  headerTaskPool  map[uint64]*types.Header  //待處理的標頭檢索任務,將起始索引映射到框架標頭
  headerTaskQueue *prque.Prque  //骨架索引的優先級隊列,以獲取用於的填充標頭
  headerPeerMiss map[string]map[uint64]struct{} //已知不可用的對等頭批處理集
  headerPendPool map[string]*fetchRequest //當前掛起的頭檢索操做
  headerResults []*types.Header //結果緩存累積完成的頭
  headerProced int //從結果中拿出來已經處理的header
  headerContCh chan bool //header下載完成時通知的頻道
  
  blockTaskPool  map[common.Hash]*types.Header //待處理的塊(body)檢索任務,將哈希映射到header
  blockTaskQueue *prque.Prque //標頭的優先級隊列,以用於獲取塊(bodies)
  blockPendPool map[string]*fetchRequest //當前的正在處理的塊(body)檢索操做
  blockDonePool map[common.Hash]struct{} //已經完成的塊(body)
  
	receiptTaskPool map[common.Hash]*types.Header //待處理的收據檢索任務,將哈希映射到header
	receiptTaskQueue *prque.Prque //標頭的優先級隊列,以用於獲取收據
	receiptPendPool map[string]*fetchRequest //當前的正在處理的收據檢索操做
	receiptDonePool map[common.Hash]struct{} //已經完成的收據
	
	resultCache []*fetchResult //下載但還沒有交付獲取結果
	resultOffset uint64 //區塊鏈中第一個緩存的獲取結果的偏移量
	resultSize common.StorageSize // 塊的近似大小

	lock   *sync.Mutex
	active *sync.Cond
	closed bool
  
}

主要細分功能

數據下載開始安排任務

  • ScheduleSkeleton:將一批header檢索任務添加到隊列中,以填充已檢索的header skeleton
  • Schedule:用來準備對一些 bodyreceipt 數據的下載

數據下載中的各種狀態

  • pending

    pending表示待檢索的XXX請求的數量,包括了:PendingHeadersPendingBlocksPendingReceipts,分別都是對應取XXXTaskQueue的長度。

  • InFlight

    InFlight表示是否有正在獲取XXX的請求,包括:InFlightHeadersInFlightBlocksInFlightReceipts,都是經過判斷len(q.receiptPendPool) > 0 來確認。

  • ShouldThrottle

    ShouldThrottle表示檢查是否應該限制下載XXX,包括:ShouldThrottleBlocksShouldThrottleReceipts,主要是爲了防止下載過程當中本地內存佔用過大。

  • Reserve

    Reserve經過構造一個 fetchRequest 結構並返回,向調用者提供指定數量的待下載的數據的信息(queue 內部會將這些數據標記爲「正在下載」)。調用者使用返回的 fetchRequest 數據向遠程節點發起新的獲取數據的請求。包括:ReserveHeadersReserveBodiesReserveReceipts

  • Cancel

    Cance用來撤消對 fetchRequest 結構中的數據的下載(queue 內部會將這些數據從新從「正在下載」的狀態更改成「等待下載」)。包括:CancelHeadersCancelBodiesCancelReceipts

  • expire

    expire檢查正在執行中的請求是否超過了超時限制,包括:ExpireHeadersExpireBodiesExpireReceipts

  • Deliver

    當有數據下載成功時,調用者會使用 deliver 功能用來通知 queue 對象。包括:DeliverHeadersDeliverBodiesDeliverReceipts

數據下載完成獲取區塊數據

  • RetrieveHeaders
    在填充 skeleton 完成後,queue.RetrieveHeaders 用來獲取整個 skeleton 中的全部 header
  • Results
    queue.Results 用來獲取當前的 headerbodyreceipt(只在 fast 模式下) 都已下載成功的區塊(並將這些區塊從 queue 內部移除)

函數實現

ScheduleSkeleton

queue.ScheduleSkeleton主要是爲了填充skeleton,它的參數是要下載區塊的起始高度和全部 skeleton 區塊頭,最核心的內容則是下面這段循環:

func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
    ......
    for i, header := range skeleton {
        index := from + uint64(i*y)
        q.headerTaskPool[index] = header
        q.headerTaskQueue.Push(index, -int64(index))
    }
}

假設已肯定須要下載的區塊高度區間是從 10 到 46,MaxHeaderFetch 的值爲 10,那麼這個高度區塊就會被分紅 3 組:10 - 19,20 - 29,30 - 39,而 skeleton 則分別由高度爲 1九、2九、39 的區塊頭組成。循環中的 index 變量其實是每一組區塊中的第一個區塊的高度(好比 十、20、30),queue.headerTaskPool 其實是一個每一組區塊中第一個區塊的高度到最後一個區塊的 header 的映射

headerTaskPool = {
  10: headerOf_19,
	20: headerOf_20,
	30: headerOf_39,
}

ReserveHeaders

reserve 用來獲取可下載的數據。

reserve  = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
			return d.queue.ReserveHeaders(p, count), false, nil
		}
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
  if _, ok := q.headerPendPool[p.id]; ok {
		return nil
	} //①
  ...
  send, skip := uint64(0), []uint64{}
	for send == 0 && !q.headerTaskQueue.Empty() {
		from, _ := q.headerTaskQueue.Pop()
		if q.headerPeerMiss[p.id] != nil {
			if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
				skip = append(skip, from.(uint64))
				continue
			}
		}
		send = from.(uint64) // ②
	}
  
 ...
  for _, from := range skip {
		q.headerTaskQueue.Push(from, -int64(from))
	} // ③
  ...
  request := &fetchRequest{
		Peer: p,
		From: send,
		Time: time.Now(),
	}
	q.headerPendPool[p.id] = request // ④
  
}

①:根據headerPendPool來判斷遠程節點是否正在下載數據信息。

②:從headerTaskQueue取出值做爲本次請求的起始高度,賦值給send變量,在這個過程當中會排除headerPeerMiss所記錄的節點下載數據失敗的信息。

③:將失敗的任務再從新寫回task queue

④:利用send變量構造fetchRequest結構,此結構是用來做爲FetchHeaders來使用的:

fetch = func(p *peerConnection, req *fetchRequest) error { 
    return p.FetchHeaders(req.From, MaxHeaderFetch) 
}

至此,ReserveHeaders會從任務隊列裏選擇最小的起始高度並構造fetchRequest傳遞給fetch獲取數據。


DeliverHeaders

deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
		}

①:若是發現下載數據的節點沒有在 queue.headerPendPool 中,就直接返回錯誤;不然就繼續處理,並將節點記錄從 queue.headerPendPool 中刪除。

request := q.headerPendPool[id]
	if request == nil {
		return 0, errNoFetchesPending
	}
	headerReqTimer.UpdateSince(request.Time)
	delete(q.headerPendPool, id)

②:驗證headers

包括三方面驗證:

  1. 檢查起始區塊的高度和哈希
  2. 檢查高度的鏈接性
  3. 檢查哈希的鏈接性
if accepted {
		//檢查起始區塊的高度和哈希
		if headers[0].Number.Uint64() != request.From {
			...
			accepted = false
		} else if headers[len(headers)-1].Hash() != target {
			...
			accepted = false
		}
	}
	if accepted {
		for i, header := range headers[1:] {
			hash := header.Hash() // 檢查高度的鏈接性
			if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
				...
			}
			if headers[i].Hash() != header.ParentHash { // 檢查哈希的鏈接性
				...
			}
		}
	}

③: 將無效數據存入headerPeerMiss,並將這組區塊起始高度從新放入headerTaskQueue

if !accepted {
	...
		miss := q.headerPeerMiss[id]
		if miss == nil {
			q.headerPeerMiss[id] = make(map[uint64]struct{})
			miss = q.headerPeerMiss[id]
		}
		miss[request.From] = struct{}{}
		q.headerTaskQueue.Push(request.From, -int64(request.From))
		return 0, errors.New("delivery not accepted")
	}

④:保存數據,並通知headerProcCh處理新的header

if ready > 0 {
		process := make([]*types.Header, ready)
		copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
		select {
		case headerProcCh <- process:
			q.headerProced += len(process)
		default:
		}
	}

⑤:發送消息給.headerContCh,通知skeleton 都被下載完了

if len(q.headerTaskPool) == 0 {
		q.headerContCh <- false
	}

DeliverHeaders 會對數據進行檢驗和保存,併發送 channel 消息給 Downloader.processHeadersDownloader.fetchPartswakeCh 參數。


Schedule

processHeaders在處理header數據的時候,會調用queue.Schedule 爲下載 bodyreceipt 做準備。

inserts := d.queue.Schedule(chunk, origin)
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
	inserts := make([]*types.Header, 0, len(headers))
	for _, header := range headers {
    //校驗
    ...
		q.blockTaskPool[hash] = header
		q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))

		if q.mode == FastSync {
			q.receiptTaskPool[hash] = header
			q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
		}
		inserts = append(inserts, header)
		q.headerHead = hash
		from++
	}
	return inserts
}

這個函數主要就是將信息寫入到body和receipt隊列,等待調度。


ReserveBody&Receipt

queue 中準備好了 bodyreceipt 相關的數據, processHeaders最後一段,是喚醒下載Bodyies和Receipts的關鍵代碼,會通知 fetchBodiesfetchReceipts 能夠對各自的數據進行下載了。

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
				select {
				case ch <- true:
				default:
				}
			}

fetchXXX 會調用fetchParts,邏輯相似上面的的,reserve最終則會調用reserveHeadersdeliver 最終調用的是 queue.deliver.

先來分析reserveHeaders

①:若是沒有可處理的任務,直接返回

if taskQueue.Empty() {
        return nil, false, nil
    }

②:若是參數給定的節點正在下載數據,返回

if _, ok := pendPool[p.id]; ok {
        return nil, false, nil
    }

③:計算 queue 對象中的緩存空間還能夠容納多少條數據

space := q.resultSlots(pendPool, donePool)

④:從 「task queue」 中依次取出任務進行處理

主要實現如下功能:

  • 計算當前 header 在 queue.resultCache 中的位置,而後填充 queue.resultCache 中相應位置的元素
  • 處理空區塊的狀況,若爲空不下載。
  • 處理遠程節點缺乏這個當前區塊數據的狀況,若是發現這個節點曾經下載當前數據失敗過,就再也不讓它下載了。

注意:resultCache 字段用來記錄全部正在被處理的數據的處理結果,它的元素類型是 fetchResult 。它的 Pending 字段表明當前區塊還有幾類數據須要下載。這裏須要下載的數據最多有兩類:body 和 receipt,full 模式下只須要下載 body 數據,而 fast 模式要多下載一個 receipt 數據。

for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
		header := taskQueue.PopItem().(*types.Header)
		hash := header.Hash()
		index := int(header.Number.Int64() - int64(q.resultOffset))
		if index >= len(q.resultCache) || index < 0 {
			....
		}
		if q.resultCache[index] == nil {
			components := 1
			if q.mode == FastSync {
				components = 2
			}
			q.resultCache[index] = &fetchResult{
				Pending: components,
				Hash:    hash,
				Header:  header,
			}
		}
  
		if isNoop(header) {
			donePool[hash] = struct{}{}
			delete(taskPool, hash)

			space, proc = space-1, proc-1
			q.resultCache[index].Pending--
			progress = true
			continue
		}
		if p.Lacks(hash) {
			skip = append(skip, header)
		} else {
			send = append(send, header)
		}
	}

最後就是構造 fetchRequest 結構並返回。


DeliverBodies&Receipts

bodyreceipt 數據都已經經過 reserve 操做構造了 fetchRequest 結構並傳給 fetch,接下來就是等待數據的到達,數據下載成功後,會調用 queue 對象的 deliver 方法進行傳遞,包括 queue.DeliverBodiesqueue.DeliverReceipts。這兩個方法都以不一樣的參數調用了 queue.deliver 方法:

①:若是下載的數據數量爲 0,則把全部此節點這次下載的數據標記爲「缺失」

if results == 0 {
		for _, header := range request.Headers {
			request.Peer.MarkLacking(header.Hash())
		}
	}

②:循環處理數據,經過調用reconstruct 填充 resultCache[index] 中的相應的字段

for i, header := range request.Headers {
  ...
  if err := reconstruct(header, i, q.resultCache[index]); err != nil {
			failure = err
			break
		}
}

③:驗證resultCache 中的數據,其對應的 request.Headers 中的 header 都應爲 nil,若不是則說明驗證未經過,須要假如到task queue從新下載

for _, header := range request.Headers {
		if header != nil {
			taskQueue.Push(header, -int64(header.Number.Uint64()))
		}
	}

④:若是有數據被驗證經過且寫入 queue.resultCache 中了(accepted > 0),發送 queue.active 消息。Results 會等待這這個信號。


Results

當(header、body、receipt)都下載完,就要將區塊寫入到數據庫了,queue.Results 就是用來返回全部目前已經下載完成的數據,它在 Downloader.processFullSyncContentDownloader.processFastSyncContent 中被調用。代碼比較簡單就很少說了。

到此爲止queue對象就分析的差很少了。


同步headers

fetchHeaders

同步headers 是是由函數fetchHeaders來完成的。

fetchHeaders的大體思想:

同步header的數據會被填充到skeleton,每次從遠程節點獲取區塊數據最大爲MaxHeaderFetch(192),因此要獲取的區塊數據若是大於192 ,會被分紅組,每組MaxHeaderFetch,剩餘的不足192個的不會填充進skeleton,具體步驟以下圖所示:

image-20201219111103965

此種方式能夠避免從同一節點下載過多錯誤數據,若是咱們鏈接到了一個惡意節點,它能夠創造一個鏈條很長且TD值也很是高的區塊鏈數據。若是咱們的區塊從 0 開始所有從它那同步,也就下載了一些根本不被別人認可的數據。若是我只從它那同步 MaxHeaderFetch 個區塊,而後發現這些區塊沒法正確填充我以前的 skeleton(多是 skeleton 的數據錯了,或者用來填充 skeleton 的數據錯了),就會丟掉這些數據。

接下來查看下代碼如何實現:

①:發起獲取header的請求

若是是下載skeleton,則會從高度 from+MaxHeaderFetch-1 開始(包括),每隔 MaxHeaderFetch-1 的高度請求一個 header,最多請求 MaxSkeletonSize 個。若是不是的話,則要獲取完整的headers

②:等待並處理headerCh中的header數據

2.1 確保遠程節點正在返回咱們須要填充skeleton所需的header

if packet.PeerId() != p.id {
				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
				break
			}

2.2 若是skeleton已經下載完畢,則須要繼續填充skeleton

if packet.Items() == 0 && skeleton {
				skeleton = false
				getHeaders(from)
				continue
			}

2.3 整個skeleton填充完成,而且沒有要獲取的header了,要通知headerProcCh所有完成

if packet.Items() == 0 {
				//下載pivot時不要停止標頭的提取
				if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
					p.log.Debug("No headers, waiting for pivot commit")
					select {
					case <-time.After(fsHeaderContCheck):
						getHeaders(from)
						continue
					case <-d.cancelCh:
						return errCanceled
					}
				}
				//完成Pivot操做(或不進行快速同步),而且沒有頭文件,終止該過程
				p.log.Debug("No more headers available")
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
					return errCanceled
				}
			}

2.4 當header有數據而且是在獲取skeleton的時候,調用fillHeaderSkeleton填充skeleton

if skeleton {
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
				if err != nil {
					p.log.Debug("Skeleton chain invalid", "err", err)
					return errInvalidChain
				}
				headers = filled[proced:]
				from += uint64(proced)
			}

2.5 若是當前處理的不是 skeleton,代表區塊同步得差很少了,處理尾部的一些區塊

判斷本地的主鏈高度與新收到的 header 的最高高度的高度差是否在 reorgProtThreshold 之內,若是不是,就將高度最高的 reorgProtHeaderDelay 個 header 丟掉。

if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
						delay := reorgProtHeaderDelay
						if delay > n {
							delay = n
						}
						headers = headers[:n-delay]
					}

2.6 若是還有 header 未處理,發給 headerProcCh 進行處理,Downloader.processHeaders 會等待這個 channel 的消息並進行處理;

if len(headers) > 0 {
				...
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
					return errCanceled
				}
				from += uint64(len(headers))
  getHeaders(from)
}

2.7 若是沒有發送標頭,或者全部標頭等待 fsHeaderContCheck 秒,再次調用 getHeaders 請求區塊

p.log.Trace("All headers delayed, waiting")
				select {
				case <-time.After(fsHeaderContCheck):
					getHeaders(from)
					continue
				case <-d.cancelCh:
					return errCanceled
				}

這段代碼後來才加上的,其 commit 的記錄在這裏,而 「pull request」 在這裏。從 「pull request」 中做者的解釋咱們能夠了解這段代碼的邏輯和功能:這個修改主要是爲了解決常常出現的 「invalid hash chain」 錯誤,出現這個錯誤的緣由是由於在咱們上一次從遠程節點獲取到一些區塊並將它們加入到本地的主鏈的過程當中,遠程節點發生了 reorg 操做(參見這篇文章裏關於「主鏈與側鏈」的介紹 );當咱們再次根據高度請求新的區塊時,對方返回給咱們的是它的新的主鏈上的區塊,而咱們沒有這個鏈上的歷史區塊,所以在本地寫入區塊時就會返回 「invalid hash chain」 錯誤。

要想發生 「reorg」 操做,就須要有新區塊加入。在以太坊主網上,新產生一個區塊的間隔是 10 秒到 20 秒左右。通常狀況下,若是僅僅是區塊數據,它的同步速度仍是很快的,每次下載也有最大數量的限制。因此在新產生一個區塊的這段時間裏,足夠同步完成一組區塊數據而對方節點不會發生 「reorg」 操做。可是注意剛纔說的「僅僅是區塊數據」的同步較快,state 數據的同步就很是慢了。簡單來講在完成同步以前可能會有多個 「pivot」 區塊,這些區塊的 state 數據會從網絡上下載,這就大大拖慢了整個區塊的同步速度,使得本地在同步一組區塊的同時對方發生 「reorg」 操做的機率大大增長。

做者認爲這種狀況下發生的 「reorg」 操做是由新產生的區塊的競爭引發的,因此最新的幾個區塊是「不穩定的」,若是本次同步的區塊數量較多(也就是咱們同步時消耗的時間比較長)(在這裏「本次同步的區數數量較多」的表現是新收到的區塊的最高高度與本地數據庫中的最高高度的差距大於 reorgProtThreshold),那麼在同步時能夠先避免同步最新區塊,這就是 reorgProtThresholdreorgProtHeaderDelay 這個變量的由來。

至此,Downloader.fetchHeaders 方法就結束了,全部的區塊頭也就同步完成了。在上面咱們提到填充skeleton的時候,是由fillHeaderSkeleton函數來完成,接下來就要細講填充skeleton的細節。


fillHeaderSkeleton

首先咱們知道以太坊在同步區塊時,先肯定要下載的區塊的高度區間,而後將這個區間按 MaxHeaderFetch 切分紅不少組,每一組的最後一個區塊組成了 「skeleton」(最後一組不滿 MaxHeaderFetch 個區塊不算做一組)。不清楚的能夠查看上面的圖。

①:將一批header檢索任務添加到隊列中,以填充skeleton

這個函數參照上面queue詳解的分析

func (q queue) ScheduleSkeleton(from uint64, skeleton []types.Header) {}

②:調用fetchParts 獲取headers數據

fetchParts是很核心的函數,下面的FetchbodiesFetchReceipts都會調用。先來大體看一下fetchParts的結構:

func (d *Downloader) fetchParts(...) error {
  ...
  for {
		select {
		case <-d.cancelCh:
		case packet := <-deliveryCh:
		case cont := <-wakeCh:
		case <-ticker.C:
		case <-update:
		...
	}
}

簡化下來就是這 5 個channel在處理,前面 4 個channel負責循環等待消息,update用來等待其餘 4 個channel的通知來處理邏輯,先分開分析一個個的channel

2.1 deliveryCh 傳遞下載的數據

deliveryCh 做用就是傳遞下載的數據,當有數據被真正下載下來時,就會給這個 channel 發消息將數據傳遞過來。這個 channel 對應的分別是:d.headerChd.bodyChd.receiptCh,而這三個 channel 分別在如下三個方法中被寫入數據:DeliverHeadersDeliverBodiesDeliverReceipts。 看下deliveryCh如何處理數據:

case packet := <-deliveryCh:
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
				accepted, err := deliver(packet)//傳遞接收到的數據塊並檢查鏈有效性
				if err == errInvalidChain {
					return err
        }
				if err != errStaleDelivery {
					setIdle(peer, accepted)
				}
				switch {
				case err == nil && packet.Items() == 0:
					...
				case err == nil:
				...
				}
			}
			select {
			case update <- struct{}{}:
			default:
			}

收到下載數據後判斷節點是否有效,若是節點沒有被移除,則會經過deliver傳遞接收到的下載數據。若是沒有任何錯誤,則通知update處理。

要注意deliver是一個回調函數,它調用了 queue 對象的 Deliver 方法:queue.DeliverHeadersqueue.DeliverBodiesqueue.DeliverReceipts,在收到下載數據就會調用此回調函數(queue相關函數分析參照queue詳解部分)。

在上面處理錯誤部分,有一個setIdle函數,它也是回調函數,其實現都是調用了 peerConnection 對象的相關方法:SetHeadersIdleSetBodiesIdleSetReceiptsIdle。它這個函數是指某些節點針對某類數據是空閒的,好比headerbodiesreceipts,若是須要下載這幾類數據,就能夠從空閒的節點下載這些數據。

2.2 wakeCh 喚醒fetchParts ,下載新數據或下載已完成

case cont := <-wakeCh:
			if !cont {
				finished = true
			}
			select {
			case update <- struct{}{}:
			default:
			}

首先咱們經過調用fetchParts傳遞的參數知道,wakeCh 的值實際上是 queue.headerContCh。在 queue.DeliverHeaders 中發現全部須要下戴的 header 都下載完成了時,纔會發送 false 給這個 channel。fetchParts 在收到這個消息時,就知道沒有 header 須要下載了。代碼以下:

func (q *queue) DeliverHeaders(......) (int, error) {
    ......
    if len(q.headerTaskPool) == 0 {
        q.headerContCh <- false
    }
    ......
}

一樣如此,bodyreceipt則是bodyWakeChreceiptWakeCh,在 processHeaders 中,若是全部 header 已經下載完成了,那麼發送 false 給這兩個 channel,通知它們沒有新的 header 了。 bodyreceipt 的下載依賴於 header,須要 header 先下載完成才能下載,因此對於下戴 bodyreceiptfetchParts 來講,收到這個 wakeCh 就表明不會再有通知讓本身下載數據了.

func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
    for {
        select {
        case headers := <-d.headerProcCh:
            if len(headers) == 0 {
                for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                    select {
                    case ch <- false:
                    case <-d.cancelCh:
                    }
                }
						...
            }
            ...
            for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
                select {
                case ch <- true:
                default:
                }
            }
        }
    }
}

2.3 ticker 負責週期性的激活 update進行消息處理

case <-ticker.C:
			select {
			case update <- struct{}{}:
			default:
			}

2.4 update (處理此前幾個channel的數據)(重要)

2.4.1 判斷是否有效節點,並獲取超時數據的信息

獲取超時數據的節點ID和數據數量,若是大於兩個的話,就將這個節點設置爲空閒狀態(setIdle),小於兩個的話直接斷開節點鏈接。

expire 是一個回調函數,會返回當前全部的超時數據信息。這個函數的實際實現都是調用了 queue 對象的 Expire 方法:ExpireHeadersExpireBodiesExpireReceipts,此函數會統計當前正在下載的數據中,起始時間與當前時間的差距超過給定閾值(downloader.requestTTL 方法的返回值)的數據,並將其返回。

if d.peers.Len() == 0 {
				return errNoPeers
			}
for pid, fails := range expire() {
  if peer := d.peers.Peer(pid); peer != nil {
    if fails > 2 {
						...
						setIdle(peer, 0)
					} else {
					...
						if d.dropPeer == nil {
						} else {
							d.dropPeer(pid)
							....
						}
					}
  }

2.4.2 處理完超時數據,判斷是否還有下載的數據

若是沒有其餘可下載的內容,請等待或終止,這裏pending()inFlight()都是回調函數,pending分別對應了queue.PendingHeadersqueue.PendingBlocksqueue.PendingReceipts,用來返回各自要下載的任務數量。inFlight()分別對應了queue.InFlightHeadersqueue.InFlightBlocksqueue.InFlightReceipts,用來返回正在下載的數據數量。

if pending() == 0 {
				if !inFlight() && finished {
				...
					return nil
				}
				break
			}

2.4.3 使用空閒節點,調用fetch函數發送數據請求

Idle()回調函數在上面已經提過了,throttle()回調函數則分別對queue.ShouldThrottleBlocksqueue.ShouldThrottleReceipts,用來表示是否應該下載bodies或者receipts

reserve函數分別對應queue.ReserveHeadersqueue.ReserveBodiesqueue.ReserveReceipts,用來從從下載任務中選取一些能夠下載的任務,並構造一個 fetchRequest 結構。它還返回一個 process 變量,標記着是否有空的數據正在被處理。好比有可能某區塊中未包含任何一條交易,所以它的 bodyreceipt 都是空的,這種數據實際上是不須要下載的。在 queue 對象的 Reserve 方法中,會對這種狀況進行識別。若是遇到空的數據,這些數據會被直接標記爲下載成功。在方法返回時,就將是否發生過「直接標記爲下載成功」的狀況返回。

capacity回調函數分別對應peerConnection.HeaderCapacitypeerConnection.BlockCapacitypeerConnection.ReceiptCapacity,用來決定下載須要請求數據的個數。

fetch回調函數分別對應peer.FetchHeaderspeer.Fetchbodiespeer.FetchReceipts,用來發送獲取各種數據的請求。

progressed, throttled, running := false, false, inFlight()
			idles, total := idle()
			for _, peer := range idles {
				if throttle() {
					...
        }
				if pending() == 0 {
					break
				}
				request, progress, err := reserve(peer, capacity(peer))
				if err != nil {
					return err
				}
				if progress {
					progressed = true
				}
        if request == nil {
					continue
				}
				if request.From > 0 {
				...
				}
				...
				if err := fetch(peer, request); err != nil {
				...
			}
			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
				return errPeersUnavailable
			}

簡單來歸納這段代碼就是:使用空閒節點下載數據,判斷是否須要暫停,或者數據是否已經下載完成;以後選取數據進行下載;最後,若是沒有遇到空塊須要下載、且沒有暫停下載和全部有效節點都空閒和確實有數據須要下載,但下載沒有運行起來,就返回 errPeersUnavailable 錯誤。

到此爲止fetchParts函數就分析的差很少了。裏面涉及的跟queue.go相關的一些函數都在queue詳解小節裏介紹了。


processHeaders

經過headerProcCh接收header數據,並處理的過程是在processHeaders函數中完成的。整個處理過程集中在:case headers := <-d.headerProcCh中:

①:若是headers的長度爲0 ,則會有如下操做:

1.1 通知全部人header已經處理完畢

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}

1.2 若沒有檢索到任何header,說明他們的TD小於咱們的,或者已經經過咱們的fetcher模塊進行了同步。

if d.mode != LightSync {
					head := d.blockchain.CurrentBlock()
					if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
						return errStallingPeer
					}
				}

1.3 若是是fast或者light 同步,確保傳遞了header

if d.mode == FastSync || d.mode == LightSync {
					head := d.lightchain.CurrentHeader()
					if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
						return errStallingPeer
					}
				}

②:若是headers的長度大於 0

2.1 若是是fast或者light 同步,調用ightchain.InsertHeaderChain()寫入headerleveldb數據庫;

if d.mode == FastSync || d.mode == LightSync {
  ....
  d.lightchain.InsertHeaderChain(chunk, frequency);
  ....
}

2.2 若是是fast或者full sync模式,則調用 d.queue.Schedule進行內容(body和receipt)檢索。

if d.mode == FullSync || d.mode == FastSync {
  ...
  inserts := d.queue.Schedule(chunk, origin)
  ...
}

③:若是找到更新的塊號,則要發信號通知新任務

if d.syncStatsChainHeight < origin {
				d.syncStatsChainHeight = origin - 1
			}
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
				select {
				case ch <- true:
				default:
				}
			}

到此處理Headers的分析就完成了。


同步bodies

同步bodies 則是由fetchBodies函數完成的。

fetchBodies

同步bodies的過程跟同步header相似,大體講下步驟:

  1. 調用fetchParts
  2. ReserveBodies()從bodyTaskPool中取出要同步的body
  3. 調用fetch,也就是調用這裏的FetchBodies從節點獲取body,發送GetBlockBodiesMsg消息;
  4. 收到bodyCh的數據後,調用deliver函數,將Transactions和Uncles寫入resultCache

同步Receipts

fetchReceipts

同步receipts的過程跟同步header相似,大體講下步驟:

  1. 調用fetchParts()
  2. ReserveBodies()從ReceiptTaskPool中取出要同步的Receipt
  3. 調用這裏的FetchReceipts從節點獲取receipts,發送GetReceiptsMsg消息;
  4. 收到receiptCh的數據後,調用deliver函數,將Receipts寫入resultCache

同步狀態

這裏咱們講兩種模式下的狀態同步:

  • fullSync: processFullSyncContentfull模式下Receipts沒有緩存到resultCache中,直接先從緩存中取出body數據,而後執行交易生成狀態,最後寫入區塊鏈。
  • fastSync:processFastSyncContent:fast模式的Receipts、Transaction、Uncles都在resultCache中,因此還須要下載"state",進行校驗,再寫入區塊鏈。

接下來大體的討論下這兩種方式。

processFullSyncContent

func (d *Downloader) processFullSyncContent() error {
	for {
		results := d.queue.Results(true)
		...
		if err := d.importBlockResults(results); err != nil ...
	}
}
func (d *Downloader) importBlockResults(results []*fetchResult) error {
	...
	select {
...
	blocks := make([]*types.Block, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	}
	if index, err := d.blockchain.InsertChain(blocks); err != nil {
		....
}

直接從result中獲取數據並生成block,直接插入區塊鏈中,就結束了。


processFastSyncContent

fast模式同步狀態內容比較多,大體也就以下幾部分,咱們開始簡單分析如下。

①:下載最新的區塊狀態

sync := d.syncState(latest.Root)

咱們直接用一張圖來表示整個大體流程:

image-20201223100153241

具體的代碼讀者本身翻閱,大體就是這麼個簡單過程。

②:計算出pivot塊

pivotlatestHeight - 64,調用splitAroundPivot()方法以pivot爲中心,將results分爲三個部分:beforePPafterP

pivot := uint64(0)
	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
		pivot = height - uint64(fsMinFullBlocks)
	}
P, beforeP, afterP := splitAroundPivot(pivot, results)

③: 對beforeP的部分調用commitFastSyncData,將bodyreceipt都寫入區塊鏈

d.commitFastSyncData(beforeP, sync);

④:對P的部分更新狀態信息爲P block的狀態,把P對應的result(包含bodyreceipt)調用commitPivotBlock插入本地區塊鏈中,並調用FastSyncCommitHead記錄這個pivothash值,存在downloader中,標記爲快速同步的最後一個區塊hash值;

if err := d.commitPivotBlock(P); err != nil {
					return err
				}

⑤:對afterP調用d.importBlockResults,將body插入區塊鏈,而不插入receipt。由於是最後 64 個區塊,因此此時數據庫中只有headerbody,沒有receipt和狀態,要經過fullSync模式進行最後的同步。

if err := d.importBlockResults(afterP); err != nil {
			return err
		}

到此爲止整個Downloader同步完成了。

參考

https://mindcarver.cn

https://github.com/ethereum/go-ethereum/pull/1889

https://yangzhe.me/2019/05/09/ethereum-downloader/#fetchparts

相關文章
相關標籤/搜索