MPT(Merkle Patricia Tries)是以太坊中存儲區塊數據的核心數據結構,它Merkle Tree和Patricia Tree融合一個樹形結構,理解MPT結構對以後學習以太坊區塊header以及智能合約狀態存儲結構的模塊源碼頗有幫助。node
它的葉子是數據塊的hash,從圖中能夠看出非葉子節點是其子節點串聯字符串的hash,底層數據的任何變更都會影響父節點,這棵樹的Merkle Root表明對底層全部數據的「摘要」。
這樣的樹有一個很大的好處,好比咱們把交易信息寫入這樣的樹形結構,當須要證實一個交易是否存在這顆樹中的時候,就不須要從新計算全部交易的hash值。好比證實圖中Hash 1-1,咱們能夠藉助Hash 1-0從新計算出Hash 1,而後再借助Hash 0從新計算出Top Hash,這樣就能夠根據算出來的Top Hash和原來的Top Hash是否同樣,若是同樣的話那麼Hash 1-1就屬於這棵樹。
因此想象一下,咱們將這個Top Hash儲存在區塊頭中,那麼有了區塊頭就能夠對區塊信息進行驗證了。同時 Hash 計算的過程能夠十分快速,預處理能夠在短期內完成。利用Merkle樹結構能帶來巨大的比較性能提高。redis
從它的名字壓縮前綴樹再結合上圖就能夠猜出來Patricia樹的特色了,這種樹形結構比將每個字符做爲一個節點的普通trie樹形結構,它的鍵值可使用多個字符,下降了樹的高度,也節省了空間,再看個例子:
圖中能夠很容易看出數中所存儲的鍵值對:算法
在以太坊中MPT的節點的規格主要有一下幾個:數據庫
這裏還有一些知識點須要瞭解的,爲了將MPT樹存儲到數據庫中,同時還能夠把MPT樹從數據庫中恢復出來,對於Extension和Leaf的節點類型作了特殊的定義:若是是一個擴展節點,那麼前綴爲0,這個0加在key前面。若是是一個葉子節點,那麼前綴就是1。同時對key的長度就奇偶類型也作了設定,若是是奇數長度則標示1,若是是偶數長度則標示0。數組
以太坊中主要有一下幾個地方用了MPT樹形結構:緩存
State Trie 區塊頭中的狀態樹數據結構
Transactions Trie 區塊頭中的交易樹app
Receipts Trie 區塊頭中的收據樹函數
Storage Trie 存儲樹源碼分析
這兩個區塊頭中,state root,tx root receipt root分別存儲了這三棵樹的樹根,第二個區塊顯示了當帳號175的數據變動(27 -> 45)的時候,只須要存儲跟這個帳號相關的部分數據,並且老的區塊中的數據仍是能夠正常訪問。
MPT樹種還有一個重要的概念一個特殊的十六進制前綴(hex-prefix, HP)編碼來對key編碼,咱們先來了解一下編碼定義規則,源碼實現後面再分析:
HEX 十六進制編碼
好比key=>"bob",b的ASCII十六進制編碼爲0x62,o的ASCII十六進制編碼爲0x6f,分解成高四位和第四位,16表示終結 0x10,最終編碼結果爲[6 2 6 15 6 2 16],
HEX-Prefix 十六進制前綴編碼
十六進制前綴編碼至關於一個逆向的過程,好比輸入的是[6 2 6 15 6 2 16],根據第一個規則去掉終止符16。根據第二個規則key前補一個四元組,從右往左第一位爲1表示葉子節點,從右往左第0位若是後面key的長度爲偶數設置爲0,奇數長度設置爲1,那麼四元組0010就是2。根據第三個規則,添加一個全0的補在後面,那麼就是20.根據第三個規則內容壓縮合並,那麼結果就是[0x20 0x62 0x6f 0x62]
官方有一個詳細的結構的示例:
下面再用一個圖像化的示例來加深一下對上面的MPT規則的理解
key的16進制 | key | value |
---|---|---|
<64 6f> | do | verb |
<64 6f 67> | dog | puppy |
<64 6f 67 65> | doge | coin |
<68 6f 72 73 65> | horse | stallion |
Compact
就是上面說的HEX-Prefix
,keybytes爲按完整字節(8bit)存儲的正常信息,hex爲按照半字節nibble(4bit)儲存信息的格式。
go-ethereum/trie/encoding:
package trie func hexToCompact(hex []byte) []byte { terminator := byte(0) if hasTerm(hex) { //檢查是否有結尾爲0x10 => 16 terminator = 1 //有結束標記16說明是葉子節點 hex = hex[:len(hex)-1] //去除尾部標記 } buf := make([]byte, len(hex)/2+1) // 字節數組 buf[0] = terminator << 5 // 標誌byte爲00000000或者00100000 //若是長度爲奇數,添加奇數位標誌1,並把第一個nibble字節放入buf[0]的低四位 if len(hex)&1 == 1 { buf[0] |= 1 << 4 // 奇數標誌 00110000 buf[0] |= hex[0] // 第一個nibble包含在第一個字節中 0011xxxx hex = hex[1:] } //將兩個nibble字節合併成一個字節 decodeNibbles(hex, buf[1:]) return buf } //compact編碼轉化爲Hex編碼 func compactToHex(compact []byte) []byte { base := keybytesToHex(compact) base = base[:len(base)-1] // apply terminator flag // base[0]包括四種狀況 // 00000000 擴展節點偶數位 // 00000001 擴展節點奇數位 // 00000010 葉子節點偶數位 // 00000011 葉子節點奇數位 // apply terminator flag if base[0] >= 2 { //若是是葉子節點,末尾添加Hex標誌位16 base = append(base, 16) } // apply odd flag //若是是偶數位,chop等於2,不然等於1 chop := 2 - base[0]&1 return base[chop:] } // 將keybytes 轉成十六進制 func keybytesToHex(str []byte) []byte { l := len(str)*2 + 1 //將一個keybyte轉化成兩個字節 var nibbles = make([]byte, l) for i, b := range str { nibbles[i*2] = b / 16 nibbles[i*2+1] = b % 16 } //末尾加入Hex標誌位16 nibbles[l-1] = 16 return nibbles } // 將十六進制的bibbles轉成key bytes,這隻能用於偶數長度的key func hexToKeybytes(hex []byte) []byte { if hasTerm(hex) { hex = hex[:len(hex)-1] } if len(hex)&1 != 0 { panic("can't convert hex key of odd length") } key := make([]byte, (len(hex)+1)/2) decodeNibbles(hex, key) return key } func decodeNibbles(nibbles []byte, bytes []byte) { for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 { bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1] } } // 返回a和b的公共前綴的長度 func prefixLen(a, b []byte) int { var i, length = 0, len(a) if len(b) < length { length = len(b) } for ; i < length; i++ { if a[i] != b[i] { break } } return i } // 十六進制key是否有結束標誌符 func hasTerm(s []byte) bool { return len(s) > 0 && s[len(s)-1] == 16 }
上面已經分析了以太坊的key的編碼方式,接下來咱們來看以太坊中MPT樹的數據結構,在分析trie的數據結構前,咱們先來了解一下node的定義:
trie/node.go
type node interface { fstring(string) string cache() (hashNode, bool) canUnload(cachegen, cachelimit uint16) bool } type ( fullNode struct { //分支節點 Children [17]node // Actual trie node data to encode/decode (needs custom encoder) flags nodeFlag } shortNode struct { Key []byte Val node flags nodeFlag } hashNode []byte valueNode []byte )
上面代碼中定義了四個struct,就是node的四種類型:
shortNode,key是一個任意長度的字符串(字節數組[]byte),體現了PatriciaTrie的特色,經過合併只有一個子節點的父節點和其子節點來縮短trie的深度,結果就是有些節點會有長度更長的key。
Val
指向分支節點或者葉子節點Val
爲rlp編碼數據,key爲該數據的hash來看下trie的結構以及對trie的操做可能對上面各個node的類型使用可能會更清晰一點,咱們來接着看下trie的結構定義trie/trie.go:
type Trie struct { db *Database // 用levelDB作KV存儲 root node //當前根節點 originalRoot common.Hash //啓動加載時候的hash,能夠從db中恢復出整個trie cachegen, cachelimit uint16 // cachegen 緩存生成值,每次Commit會+1 }
這裏的cachegen緩存生成值會被附加在node節點上面,若是當前的cachegen-cachelimit參數大於node的緩存生成,那麼node會從cache裏面卸載,以便節約內存。一個緩存多久沒被時候用就會被從緩存中移除,看起來和redis等一些LRU算法的cache db很像。
Trie的初始化:
func New(root common.Hash, db *Database) (*Trie, error) { if db == nil { panic("trie.New called without a database") } trie := &Trie{ db: db, originalRoot: root, } if (root != common.Hash{}) && root != emptyRoot { // 若是hash不是空值,從數據庫中加載一個已經存在的樹 rootnode, err := trie.resolveHash(root[:], nil) if err != nil { return nil, err } trie.root = rootnode //根節點爲找到的trie } //不然返回新建一個樹 return trie, nil }
這裏的trie.resolveHash
就是加載整課樹的方法,還有傳入的root common.Hash
hash是一個將hex編碼轉爲原始hash的32位byte[] (common.HexToHash()),來看下如何經過這個hash來找到整個trie的:
func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) { cacheMissCounter.Inc(1) //沒執行一次計數器+1 //上面說過了,n是一個32位byte[] hash := common.BytesToHash(n) //經過hash從db中取出node的RLP編碼內容 enc, err := t.db.Node(hash) if err != nil || enc == nil { return nil, &MissingNodeError{NodeHash: hash, Path: prefix} } return mustDecodeNode(n, enc, t.cachegen), nil }
mustDecodeNode
中調用了decodeNode
,這個方法經過RLP的list長度來判斷該編碼內容屬於上面節點,若是是兩個字段則爲shortNode,若是是17個字段則爲fullNode,而後再調用各自的decode解析函數
func decodeNode(hash, buf []byte, cachegen uint16) (node, error) { if len(buf) == 0 { return nil, io.ErrUnexpectedEOF } elems, _, err := rlp.SplitList(buf) //將buf拆分爲列表的內容以及列表後的任何剩餘字節。 if err != nil { return nil, fmt.Errorf("decode error: %v", err) } switch c, _ := rlp.CountValues(elems); c { case 2: n, err := decodeShort(hash, elems, cachegen) //decode shortNode return n, wrapError(err, "short") case 17: n, err := decodeFull(hash, elems, cachegen) //decode fullNode return n, wrapError(err, "full") default: return nil, fmt.Errorf("invalid number of list elements: %v", c) } }
decodeShort
函數中經過key是否含有結束標識符來判斷是葉子節點仍是擴展節點,這個咱們在上面的編碼部分已經講過,有結束標示符則是葉子節點,再經過rlp.SplitString
解析出val生成一個葉子節點shortNode返回。沒有結束標誌符則爲擴展節點,經過decodeRef
解析並生成一個shortNode返回。
func decodeShort(hash, elems []byte, cachegen uint16) (node, error) { kbuf, rest, err := rlp.SplitString(elems) //將elems填入RLP字符串的內容以及字符串後的任何剩餘字節。 if err != nil { return nil, err } flag := nodeFlag{hash: hash, gen: cachegen} key := compactToHex(kbuf) if hasTerm(key) { // value node val, _, err := rlp.SplitString(rest) if err != nil { return nil, fmt.Errorf("invalid value node: %v", err) } return &shortNode{key, append(valueNode{}, val...), flag}, nil } r, _, err := decodeRef(rest, cachegen) if err != nil { return nil, wrapError(err, "val") } return &shortNode{key, r, flag}, nil }
繼續看下decodeRef
主要作了啥操做:
func decodeRef(buf []byte, cachegen uint16) (node, []byte, error) { kind, val, rest, err := rlp.Split(buf) if err != nil { return nil, buf, err } switch { case kind == rlp.List: // 'embedded' node reference. The encoding must be smaller // than a hash in order to be valid. if size := len(buf) - len(rest); size > hashLen { err := fmt.Errorf("oversized embedded node (size is %d bytes, want size < %d)", size, hashLen) return nil, buf, err } n, err := decodeNode(nil, buf, cachegen) return n, rest, err case kind == rlp.String && len(val) == 0: // empty node return nil, rest, nil case kind == rlp.String && len(val) == 32: return append(hashNode{}, val...), rest, nil default: return nil, nil, fmt.Errorf("invalid RLP string size %d (want 0 or 32)", len(val)) } }
這段代碼比較清晰,經過rlp.Split
後返回的類型作不一樣的處理,若是是list,調用decodeNode
解析,若是是空節點返回空,若是是一個32位hash值返回hashNode,decodeFull
:
func decodeFull(hash, elems []byte, cachegen uint16) (*fullNode, error) { n := &fullNode{flags: nodeFlag{hash: hash, gen: cachegen}} for i := 0; i < 16; i++ { cld, rest, err := decodeRef(elems, cachegen) if err != nil { return n, wrapError(err, fmt.Sprintf("[%d]", i)) } n.Children[i], elems = cld, rest } val, _, err := rlp.SplitString(elems) if err != nil { return n, err } if len(val) > 0 { n.Children[16] = append(valueNode{}, val...) } return n, nil }
再回到Trie結構體中的cachegen, cachelimit,Trie樹每次Commit時cachegen都會+1,這兩個參數是cache的控制參數,爲了弄清楚Trie的緩存機制,咱們來看下Commit具體是幹嗎的:
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { if t.db == nil { panic("commit called on trie with nil database") } hash, cached, err := t.hashRoot(t.db, onleaf) if err != nil { return common.Hash{}, err } t.root = cached t.cachegen++ return common.BytesToHash(hash.(hashNode)), nil //返回所指向的node的未編碼的hash } //返回trie.root所指向的node的hash以及每一個節點都帶有各自hash的trie樹的root。 func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) { if t.root == nil { return hashNode(emptyRoot.Bytes()), nil, nil } h := newHasher(t.cachegen, t.cachelimit, onleaf) defer returnHasherToPool(h) return h.hash(t.root, db, true)//爲每一個節點生成一個未編碼的hash }
Commit目的,是將trie樹中的key轉爲Compact編碼,爲每一個節點生成一個hash,它就是爲了確保後續能正常將變更的數據提交到db.
那麼這個cachegen是怎麼放到該節點中的,當trie樹在節點插入的時候,會把當前trie的cachegen放入到該節點中,看下trie的insert方法:
//n -> trie當前插入節點 //prefix -> 當前匹配到的key的公共前綴 //key -> 待插入數據當前key中剩餘未匹配的部分,完整的key=prefix+key //value -> 待插入數據自己 //返回 -> 是否改變樹,插入完成後子樹根節點,error func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) { if len(key) == 0 { if v, ok := n.(valueNode); ok { return !bytes.Equal(v, value.(valueNode)), value, nil } //若是key長度爲0,那麼說明當前節點中新增長的節點和當前節點數據同樣,認爲已經新增過了就直接返回 return true, value, nil } switch n := n.(type) { case *shortNode: matchlen := prefixLen(key, n.Key) // 返回公共前綴長度 if matchlen == len(n.Key) { //若是整個key匹配,請按原樣保留此節點,並僅更新該值。 dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value) if !dirty || err != nil { return false, n, err } return true, &shortNode{n.Key, nn, t.newFlag()}, nil } //不然在它們不一樣的索引處分支出來 branch := &fullNode{flags: t.newFlag()} var err error _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val) if err != nil { return false, nil, err } _, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value) if err != nil { return false, nil, err } //若是它在索引0處出現則用該branch替換shortNode if matchlen == 0 { return true, branch, nil } // Otherwise, replace it with a short node leading up to the branch. return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil case *fullNode: dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value) if !dirty || err != nil { return false, n, err } n = n.copy() n.flags = t.newFlag() n.Children[key[0]] = nn return true, n, nil case nil: //在空trie中添加一個節點,就是葉子節點,返回shortNode。 return true, &shortNode{key, value, t.newFlag()}, nil case hashNode: rn, err := t.resolveHash(n, prefix)//恢復一個存儲在db中的node if err != nil { return false, nil, err } dirty, nn, err := t.insert(rn, prefix, key, value) //遞歸調用 if !dirty || err != nil { return false, rn, err } return true, nn, nil default: panic(fmt.Sprintf("%T: invalid node: %v", n, n)) }
Trie樹的插入,這是一個遞歸調用的方法,從根節點開始,一直往下找,直到找到能夠插入的點,進行插入操做。
若是當前的根節點葉子節點shortNode,首先計算公共前綴
接下來看如何遍歷Trie樹從Trie中獲取數據,根據key獲取的value過程:
func (t *Trie) TryGet(key []byte) ([]byte, error) { key = keybytesToHex(key) value, newroot, didResolve, err := t.tryGet(t.root, key, 0) if err == nil && didResolve { t.root = newroot } return value, err } func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) { switch n := (origNode).(type) { case nil: // 空樹 return nil, nil, false, nil case valueNode: // 就是要查找的葉子節點數據 return n, n, false, nil case *shortNode: if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) { // key在trie中不存在 return nil, n, false, nil } value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key)) if err == nil && didResolve { n = n.copy() n.Val = newnode n.flags.gen = t.cachegen } return value, n, didResolve, err case *fullNode: value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1) if err == nil && didResolve { n = n.copy() n.flags.gen = t.cachegen n.Children[key[pos]] = newnode } return value, n, didResolve, err case hashNode: // hashNodes時候須要去db中獲取 child, err := t.resolveHash(n, key[:pos]) if err != nil { return nil, n, true, err } value, newnode, _, err := t.tryGet(child, key, pos) return value, newnode, true, err default: panic(fmt.Sprintf("%T: invalid node: %v", origNode, origNode)) } }
tryGet(origNode node, key []byte, pos int)
方法提供三個參數,起始的node,hash key,還有當前hash匹配的位置,didResolve
用來判斷trie樹是否發生變化,根據hashNode去db中獲取該node值,獲取到後,須要更新現有的trie,didResolve就會發生變化。
關於Trie的Update
和Delete
就不分析了,在trie包中還有其餘的功能,咱們來大略看下主要是幹嗎的不作詳細解讀了:
轉載請註明: 轉載自Ryan是菜鳥 | LNMP技術棧筆記
若是以爲本篇文章對您十分有益,何不 打賞一下
本文連接地址: 以太坊源碼分析--MPT 樹