死磕以太坊源碼分析之p2p節點發現

死磕以太坊源碼分析之p2p節點發現html

在閱讀節點發現源碼以前必需要理解kadmilia算法,能夠參考:KAD算法詳解node

節點發現概述

節點發現,使本地節點得知其餘節點的信息,進而加入到p2p網絡中。git

以太坊的節點發現基於相似的kademlia算法,源碼中有兩個版本,v4和v5。v4適用於全節點,經過discover.ListenUDP使用,v5適用於輕節點經過discv5.ListenUDP使用,本文介紹的是v4版本。github

節點發現功能主要涉及 Server Table udp 這幾個數據結構,它們有獨自的事件響應循環,節點發現功能即是它們互相協做完成的。其中,每一個以太坊客戶端啓動後都會在本地運行一個Server,並將網絡拓撲中相鄰的節點視爲Node,而TableNode的容器,udp則是負責維持底層的鏈接。這些結構的關係以下圖:算法

image-20201123210628944

p2p服務開啓節點發現

在P2p的server.go 的start方法中:數據庫

if err := srv.setupDiscovery(); err != nil {
		return err
	}

進入到setupDiscovery中:網絡

// Discovery V4
	var unhandled chan discover.ReadPacket
	var sconn *sharedUDPConn
	if !srv.NoDiscovery {
		...
		ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
		....
	}

discover.ListenUDP方法即開啓了節點發現的功能.數據結構

首先解析出監聽地址的UDP端口,根據端口返回與之相連的UDP鏈接,以後返回鏈接的本地網絡地址,接着設置最後一個UDP-on-IPv4端口。到此爲止節點發現的一些準備工做作好,接下下來開始UDP的監聽:併發

ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)

而後進行UDP 的監聽,下面是監聽的過程:app

監聽UDP

// 監聽給定的socket 上的發現的包
func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
	return ListenV4(c, ln, cfg)
}
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
	closeCtx, cancel := context.WithCancel(context.Background())
	t := &UDPv4{
		conn:            c,
		priv:            cfg.PrivateKey,
		netrestrict:     cfg.NetRestrict,
		localNode:       ln,
		db:              ln.Database(),
		gotreply:        make(chan reply),
		addReplyMatcher: make(chan *replyMatcher),
		closeCtx:        closeCtx,
		cancelCloseCtx:  cancel,
		log:             cfg.Log,
	}
	if t.log == nil {
		t.log = log.Root()
	}

	tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) // 
	if err != nil {
		return nil, err
	}
	t.tab = tab
	go tab.loop() //

	t.wg.Add(2)
	go t.loop() //
	go t.readLoop(cfg.Unhandled) //
	return t, nil
}

主要作了如下幾件事:

1.新建路由表

tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)

新建路由表作了如下幾件事:

  • 初始化table對象
  • 設置bootnode(setFallbackNodes)
    • 節點第一次啓動的時候,節點會與硬編碼在以太坊源碼中的bootnode進行鏈接,全部的節點加入幾乎都先鏈接了它。鏈接上bootnode後,獲取bootnode部分的鄰居節點,而後進行節點發現,獲取更多的活躍的鄰居節點
    • nursery 是在 Table 爲空而且數據庫中沒有存儲節點時的初始鏈接節點(上文中的 6 個節點),經過 bootnode 能夠發現新的鄰居
  • tab.seedRand:使用提供的種子值將生成器初始化爲肯定性狀態
  • loadSeedNodes:加載種子節點;從保留已知節點的數據庫中隨機的抽取30個節點,再加上引導節點列表中的節點,放置入k桶中,若是K桶沒有空間,則假如到替換列表中。

2.測試鄰居節點連通性

首先知道UDP協議是沒有鏈接的概念的,因此須要不斷的ping 來測試對端節點是否正常,在新建路由表以後,就來到下面的循環,不斷的去作上面的事。

go tab.loop()

定時運行doRefreshdoRevalidatecopyLiveNodes進行刷新K桶。

以太坊的k桶設置:

const (
	alpha           = 3  // Kademlia併發參數, 是系統內一個優化參數,控制每次從K桶最多取出節點個數,ethereum取值3
  
	bucketSize      = 16 // K桶大小(可容納節點數)
  
	maxReplacements = 10 // 每桶更換列表的大小
	hashBits          = len(common.Hash{}) * 8 //每一個節點ID長度,32*8=256, 32位16進制
	nBuckets          = hashBits / 15       //  K桶個數
  )

首先搞清楚這三個定時器運行的時間:

refreshInterval    = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval  = 30 * time.Second
doRefresh

doRefresh對隨機目標執行查找以保持K桶已滿。若是表爲空(初始引導程序或丟棄的有故障),則插入種子節點。

主要如下幾步:

  1. 從數據庫加載隨機節點和引導節點。這應該會產生一些之前見過的節點

    tab.loadSeedNodes()
  2. 將本地節點ID做爲目標節點進行查找最近的鄰居節點

    tab.net.lookupSelf()
    func (t *UDPv4) lookupSelf() []*enode.Node {
    	return t.newLookup(t.closeCtx, encodePubkey(&t.priv.PublicKey)).run()
    }
    func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup {
    	...
    		return t.findnode(n.ID(), n.addr(), targetKey)
    	})
    	return it
    }

    向這些節點發起findnode操做查詢離target節點最近的節點列表,將查詢獲得的節點進行ping-pong測試,將測試經過的節點落庫保存

    通過這個流程後,節點的K桶就可以比較均勻地將不一樣網絡節點更新到本地K桶中。

    unc (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) {
    	t.ensureBond(toid, toaddr)
    	nodes := make([]*node, 0, bucketSize)
    	nreceived := 0
      // 設置迴應回調函數,等待類型爲neighborsPacket的鄰近節點包,若是類型對,就執行回調請求
    	rm := t.pending(toid, toaddr.IP, p_neighborsV4, func(r interface{}) (matched bool, requestDone bool) {
    		reply := r.(*neighborsV4)
    		for _, rn := range reply.Nodes {
    			nreceived++
          // 獲得一個簡單的node結構
    			n, err := t.nodeFromRPC(toaddr, rn)
    			if err != nil {
    				t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err)
    				continue
    			}
    			nodes = append(nodes, n)
    		}
    		return true, nreceived >= bucketSize
    	})
      //上面了一個管道事件,下面開始發送真正的findnode報文,而後進行等待了
    	t.send(toaddr, toid, &findnodeV4{
    		Target:     target,
    		Expiration: uint64(time.Now().Add(expiration).Unix()),
    	})
    	return nodes, <-rm.errc
    }
  3. 查找3個隨機的目標節點

    for i := 0; i < 3; i++ {
    		tab.net.lookupRandom()
    	}
doRevalidate

doRevalidate檢查隨機存儲桶中的最後一個節點是否仍然存在,若是不是,則替換或刪除該節點。

主要如下幾步:

  1. 返回隨機的非空K桶中的最後一個節點

    last, bi := tab.nodeToRevalidate()
  2. 對最後的節點執行Ping操做,而後等待Pong

    remoteSeq, err := tab.net.ping(unwrapNode(last))
  3. 若是節點ping通了的話,將節點移動到最前面

    tab.bumpInBucket(b, last)
  4. 沒有收到回覆,選擇一個替換節點,或者若是沒有任何替換節點,則刪除該節點

    tab.replace(b, last)
copyLiveNodes

copyLiveNodes將表中的節點添加到數據庫,若是節點在表中的時間超過了5分鐘。

這部分代碼比較簡單,就伸展闡述。

if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
				tab.db.UpdateNode(unwrapNode(n))
			}

3.檢測各種信息

go t.loop()

loop循環主要監聽如下幾類消息:

  • case <-t.closeCtx.Done():檢測是否中止
  • p := <-t.addReplyMatcher:檢測是否有添加新的待處理消息
  • r := <-t.gotreply:檢測是否接收到其餘節點的回覆消息

4. 處理UDP數據包

go t.readLoop(cfg.Unhandled)

主要有如下兩件事:

  1. 循環接收其餘節點發來的udp消息

    nbytes, from, err := t.conn.ReadFromUDP(buf)
  2. 處理接收到的UDP消息

    t.handlePacket(from, buf[:nbytes])

接下來對這兩個函數進行進一步的解析。

接收UDP消息

接收UDP消息比較的簡單,就是不斷的從鏈接中讀取Packet數據,它有如下幾種消息:

  • ping:用於判斷遠程節點是否在線。

  • pong:用於回覆ping消息的響應。

  • findnode:查找與給定的目標節點相近的節點。

  • neighbors:用於回覆findnode的響應,與給定的目標節點相近的節點列表


處理UDP消息

主要作了如下幾件事:

  1. 數據包解碼

    packet, fromKey, hash, err := decodeV4(buf)
  2. 檢查數據包是否有效,是否能夠處理

    packet.preverify(t, from, fromID, fromKey)

    在校驗這一塊,涉及不一樣的消息類型不一樣的校驗,咱們來分別對各類消息進行分析。

    ①:ping

    • 校驗消息是否過時
    • 校驗公鑰是否有效

    ②:pong

    • 校驗消息是否過時
    • 校驗回覆是否正確

    ③:findNodes

    • 校驗消息是否過時
    • 校驗節點是不是最近的節點

    ④:neighbors

    • 校驗消息是否過時
    • 用於回覆findnode的響應,校驗回覆是否正確
  3. 處理packet數據

    packet.handle(t, from, fromID, hash)

    相同的,也會有4種消息,可是咱們這邊重點講處理findNodes的消息:

     

func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) {
...
}

咱們這裏就稍微介紹下如何處理`findnode`的消息:

```go
func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) {
	// 肯定最近的節點
	target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
	t.tab.mutex.Lock()
	//最接近的返回表中最接近給定id的n個節點
	closest := t.tab.closest(target, bucketSize, true).entries
	t.tab.mutex.Unlock()
	// 以每一個數據包最多maxNeighbors的塊的形式發送鄰居,以保持在數據包大小限制如下。
	p := neighborsV4{Expiration: uint64(time.Now().Add(expiration).Unix())}
	var sent bool
	for _, n := range closest { //掃描這些最近的節點列表,而後一個包一個包的發送給對方
		if netutil.CheckRelayIP(from.IP, n.IP()) == nil {
			p.Nodes = append(p.Nodes, nodeToRPC(n))
		}
		if len(p.Nodes) == maxNeighbors {
			t.send(from, fromID, &p)//給對方發送 neighborsPacket 包,裏面包含節點列表
			p.Nodes = p.Nodes[:0]
			sent = true
		}
	}
	if len(p.Nodes) > 0 || !sent {
		t.send(from, fromID, &p)
	}
}

首先先肯定最近的節點,再一個包一個包的發給對方,並校驗節點的IP,最後把有效的節點發送給請求方。


涉及的結構體:

UDP

  • conn :接口,包括了從UDP中讀取和寫入,關閉UDP鏈接以及獲取本地地址。
  • netrestrict:IP網絡列表
  • localNode:本地節點
  • tab:路由表

Table

  • buckets:全部節點都加到這個裏面,按照距離

  • nursery:啓動節點

  • rand:隨機來源

  • ips:跟蹤IP,確保IP中最多N個屬於同一網絡範圍

  • net: UDP 傳輸的接口

    • 返回本地節點
    • 將enrRequest發送到給定的節點並等待響應
    • findnode向給定節點發送一個findnode請求,並等待該節點最多發送了k個鄰居
    • 返回查找最近的節點
    • 將ping消息發送到給定的節點,而後等待答覆

如下是table的結構圖:

image-20201112104254003


思惟導圖

思惟導圖獲取地址

image-20201123211034861

參考文檔

http://mindcarver.cn/ ⭐️⭐️⭐️⭐️

https://github.com/blockchainGuide/ ⭐️⭐️⭐️⭐️

http://www.javashuo.com/article/p-tsmvgwqk-nv.html

http://qjpcpu.github.io/blog/2018/01/29/shen-ru-ethereumyuan-ma-p2pmo-kuai-ji-chu-jie-gou/

https://www.jianshu.com/p/b232c870dcd2

https://bbs.huaweicloud.com/blogs/113684

https://www.jianshu.com/p/94d02a41a146

相關文章
相關標籤/搜索