包trie 實現了Merkle Patricia Tries,這裏用簡稱MPT來稱呼這種數據結構,這種數據結構其實是一種Trie樹變種,MPT是以太坊中一種很是重要的數據結構,用來存儲用戶帳戶的狀態以及狀態的變動,用來存儲交易信息,用來存儲交易的收據信息。MPT其實是三種數據結構的組合,分別是Trie樹, Patricia Trie, 和Merkle樹。下面分別介紹這三種數據結構。
## Trie樹 (引用介紹 http://dongxicheng.org/structure/trietree/)
Trie樹,又稱字典樹,單詞查找樹或者前綴樹,是一種用於快速檢索的多叉樹結構,如英文字母的字典樹是一個26叉樹,數字的字典樹是一個10叉樹。
Trie樹能夠利用字符串的公共前綴來節約存儲空間。以下圖所示,該trie樹用10個節點保存了6個字符串tea,ten,to,in,inn,int:
在該trie樹中,字符串in,inn和int的公共前綴是「in」,所以能夠只存儲一份「in」以節省空間。固然,若是系統中存在大量字符串且這些字符串基本沒有公共前綴,則相應的trie樹將很是消耗內存,這也是trie樹的一個缺點。
Trie樹的基本性質能夠概括爲:
- 根節點不包含字符,除根節點意外每一個節點只包含一個字符。
- 從根節點到某一個節點,路徑上通過的字符鏈接起來,爲該節點對應的字符串。
- 每一個節點的全部子節點包含的字符串不相同。
## Patricia Tries (前綴樹)
前綴樹根Trie樹的不一樣之處在於Trie樹給每個字符串分配一個節點,這樣若是不少很長的,又沒有公共節點的字符串就會致使Trie樹退化成一個數組。在以太坊裏面會由***構造不少這種節點形成拒絕服務***。前綴樹的不一樣之處在於若是節點公共前綴,那麼就使用公共前綴,不然就把剩下的全部節點插入同一個節點。Patricia相對Tire的優化正以下圖:
上圖存儲的8個Key Value對,能夠看到前綴樹的特色。
|Key | value |
| ------------- | ---: |
|6c0a5c71ec20bq3w|5 |
|6c0a5c71ec20CX7j|27 |
|6c0a5c71781a1FXq|18 |
|6c0a5c71781a9Dog|64 |
|6c0a8f743b95zUfe|30 |
|6c0a8f743b95jx5R|2 |
|6c0a8f740d16y03G|43 |
|6c0a8f740d16vcc1|48 |
Merkle Tree,一般也被稱做Hash Tree,顧名思義,就是存儲hash值的一棵樹。Merkle樹的葉子是數據塊(例如,文件或者文件的集合)的hash值。非葉節點是其對應子節點串聯字符串的hash。
![ image]( picture/trie_3.png)
Merkle Tree的主要做用是當我拿到Top Hash的時候,這個hash值表明了整顆樹的信息摘要,當樹裏面任何一個數據發生了變更,都會致使Top Hash的值發生變化。 而Top Hash的值是會存儲到區塊鏈的區塊頭裏面去的, 區塊頭是必須通過工做量證實。 這也就是說我只要拿到一個區塊頭,就能夠對區塊信息進行驗證。 更加詳細的信息請參考那個博客。有詳細的介紹。
## 以太坊的MPT
每個以太坊的區塊頭包含三顆MPT樹,分別是
- 交易樹
- 收據樹(交易執行過程當中的一些數據)
- 狀態樹(帳號信息, 合約帳戶和用戶帳戶)
下圖中是兩個區塊頭,其中state root,tx root receipt root分別存儲了這三棵樹的樹根,第二個區塊顯示了當帳號 175的數據變動(27 -> 45)的時候,只須要存儲跟這個帳號相關的部分數據,並且老的區塊中的數據仍是能夠正常訪問。(這個有點相似與函數式編程語言中的不可變的數據結構的實現)
![ image]( picture/trie_4.png)
詳細結構爲
![ world state trie]( picture/worldstatetrie.png)
## 黃皮書形式化定義(Appendix D. Modified Merkle Patricia Tree)
正式地,咱們假設輸入值J,包含Key Value對的集合(Key Value都是字節數組):
當處理這樣一個集合的時候,咱們使用下面的這樣標識表示數據的 Key和Value(對於J集合中的任意一個I, I0表示Key, I1表示Value)
對於任何特定的字節,咱們能夠表示爲對應的半字節(nibble),其中Y集合在Hex-Prefix Encoding中有說明,意爲半字節(4bit)集合(之因此採用半字節,其與後續說明的分支節點branch node結構以及key中編碼flag有關)
咱們定義了TRIE函數,用來表示樹根的HASH值(其中c函數的第二個參數,意爲構建完成後樹的層數。root的值爲0)
咱們還定義一個函數n,這個trie的節點函數。 當組成節點時,咱們使用RLP對結構進行編碼。 做爲下降存儲複雜度的手段,對於RLP少於32字節的節點,咱們直接存儲其RLP值, 對於那些較大的,咱們存儲其HASH節點。
咱們用c來定義節點組成函數:
以相似於基數樹的方式,當Trie樹從根遍歷到葉時,能夠構建單個鍵值對。 Key經過遍歷累積,從每一個分支節點獲取單個半字節(與基數樹同樣)。 與基數樹不一樣,在共享相同前綴的多個Key的狀況下,或者在具備惟一後綴的單個Key的狀況下,提供兩個優化節點。的狀況下,或者在具備惟一後綴的單個密鑰的狀況下,提供兩個優化節點。 所以,當遍歷時,可能從其餘兩個節點類型,擴展和葉中的每個潛在地獲取多個半字節。在Trie樹中有三種節點:
-**葉子節點(Leaf):** 葉子節點包含兩個字段, 第一個字段是剩下的Key的半字節編碼,並且半字節編碼方法的第二個參數爲true, 第二個字段是Value
-**擴展節點(Extention):** 擴展節點也包含兩個字段, 第一個字段是剩下的Key的能夠至少被兩個剩下節點共享的部分的半字節編碼,第二個字段是n(J,j)
-**分支節點(Branch):** 分支節點包含了17個字段,其前16個項目對應於這些點在其遍歷中的鍵的十六個可能的半字節值中的每個。第17個字段是存儲那些在當前結點結束了的節點(例如, 有三個key,分別是 (abc ,abd, ab) 第17個字段儲存了ab節點的值)
分支節點只有在須要的時候使用, 對於一個只有一個非空 key value對的Trie樹,可能不存在分支節點。 若是使用公式來定義這三種節點, 那麼公式以下:
圖中的HP函數表明Hex-Prefix Encoding,是一種半字節編碼格式,RLP是使用RLP進行序列化的函數。
![ image]( picture/trie_10.png)
對於上圖的三種狀況的解釋
- 若是當前須要編碼的KV集合只剩下一條數據,那麼這條數據按照第一條規則進行編碼。
- 若是當前須要編碼的KV集合有公共前綴,那麼提取最大公共前綴並使用第二條規則進行處理。
- 若是不是上面兩種狀況,那麼使用分支節點進行集合切分,由於key是使用HP進行編碼的,因此可能的分支只有0-15這16個分支。能夠看到u的值由n進行遞歸定義,而若是有節點恰好在這裏完結了,那麼第17個元素v就是爲這種狀況準備的。
對於數據應該如何存儲和不該該如何存儲, 黃皮書中說明沒有顯示的定義。因此這是一個實現上的問題。咱們簡單的定義了一個函數來把J映射爲一個Hash。 咱們認爲對於任意一個J,只存在惟一一個Hash值。
### 黃皮書的形式化定義(Hex-Prefix Encoding)--十六進制前綴編碼
十六進制前綴編碼是將任意數量的半字節編碼爲字節數組的有效方法。它可以存儲附加標誌,當在Trie樹中使用時(惟一會使用的地方),會在節點類型之間消除歧義。
它被定義爲從一系列半字節(由集合Y表示)與布爾值一塊兒映射到字節序列(由集合B表示)的函數HP:
所以,第一個字節的高半字節包含兩個標誌; 最低bit位編碼了長度的奇偶位,第二低的bit位編碼了flag的值。 在偶數個半字節的狀況下,第一個字節的低半字節爲零,在奇數的狀況下爲第一個半字節。 全部剩餘的半字節(如今是偶數)適合其他的字節。
## 源碼實現
### trie/encoding.go
encoding.go主要處理trie樹中的三種編碼格式的相互轉換的工做。 三種編碼格式分別爲下面的三種編碼格式。
-**KEYBYTES encoding**這種編碼格式就是原生的key字節數組,大部分的Trie的API都是使用這邊編碼格式
-**HEX encoding** 這種編碼格式每個字節包含了Key的一個半字節,尾部接上一個可選的'終結符','終結符'表明這個節點究竟是葉子節點仍是擴展節點。當節點被加載到內存裏面的時候使用的是這種節點,由於它的方便訪問。
-**COMPACT encoding** 這種編碼格式就是上面黃皮書裏面說到的Hex-Prefix Encoding,這種編碼格式能夠當作是*HEX encoding**這種編碼格式的另一種版本,能夠在存儲到數據庫的時候節約磁盤空間。
簡單的理解爲:將普通的字節序列keybytes編碼爲帶有t標誌與奇數個半字節nibble標誌位的keybytes
- keybytes爲按完整字節(8bit)存儲的正常信息
- hex爲按照半字節nibble(4bit)儲存信息的格式。供compact使用
- 爲了便於做黃皮書中Modified Merkle Patricia Tree的節點的key,編碼爲偶數字節長度的hex格式。其第一個半字節nibble會在低的2個bit位中,由高到低分別存放t標誌與奇數標誌。經compact編碼的keybytes,在增長了hex的t標誌與半字節nibble爲偶數個(即完整的字節)的狀況下,便於存儲
代碼實現,主要是實現了這三種編碼的相互轉換,以及一個求取公共前綴的方法。
func hexToCompact(hex []byte) []byte {
terminator := byte(0)
if hasTerm(hex) {
terminator = 1
hex = hex[:len(hex)-1]
}
buf := make([]byte, len(hex)/2+1)
buf[0] = terminator << 5 // the flag byte
if len(hex)&1 == 1 {
buf[0] |= 1 << 4 // odd flag
buf[0] |= hex[0] // first nibble is contained in the first byte
hex = hex[1:]
}
decodeNibbles(hex, buf[1:])
return buf
}
func compactToHex(compact []byte) []byte {
base := keybytesToHex(compact)
base = base[:len(base)-1]
// apply terminator flag
if base[0] >= 2 { // TODO 先將keybytesToHex輸出的末尾結束標誌刪除後,再經過判斷頭半個字節的標誌位t加回去。操做冗餘
base = append(base, 16)
}
// apply odd flag
chop := 2 - base[0]&1
return base[chop:]
}
func keybytesToHex(str []byte) []byte {
l := len(str)*2 + 1
var nibbles = make([]byte, l)
for i, b := range str {
nibbles[i*2] = b / 16
nibbles[i*2+1] = b % 16
}
nibbles[l-1] = 16
return nibbles
}
// hexToKeybytes turns hex nibbles into key bytes.
// This can only be used for keys of even length.
func hexToKeybytes(hex []byte) []byte {
if hasTerm(hex) {
hex = hex[:len(hex)-1]
}
if len(hex)&1 != 0 {
panic("can't convert hex key of odd length")
}
key := make([]byte, (len(hex)+1)/2) // TODO 對於一個已經判斷爲偶數的len(hex)在整除2的同時加1,爲無效的+1邏輯
decodeNibbles(hex, key)
return key
}
func decodeNibbles(nibbles []byte, bytes []byte) {
for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 {
bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1]
}
}
// prefixLen returns the length of the common prefix of a and b.
func prefixLen(a, b []byte) int {
var i, length = 0, len(a)
if len(b) < length {
length = len(b)
}
for ; i < length; i++ {
if a[i] != b[i] {
break
}
}
return i
}
// hasTerm returns whether a hex key has the terminator flag.
func hasTerm(s []byte) bool {
return len(s) > 0 && s[len(s)-1] == 16
}
### 數據結構
node的結構,能夠看到node分爲4種類型, fullNode對應了黃皮書裏面的分支節點,shortNode對應了黃皮書裏面的擴展節點和葉子節點(經過shortNode.Val的類型來對應究竟是葉子節點(valueNode)仍是分支節點(fullNode))
type node interface {
fstring(string) string
cache() (hashNode, bool)
canUnload(cachegen, cachelimit uint16) bool
}
type (
fullNode struct {
Children [17]node // Actual trie node data to encode/decode (needs custom encoder)
flags nodeFlag
}
shortNode struct {
Key []byte
Val node
flags nodeFlag
}
hashNode []byte
valueNode []byte
)
trie的結構, root包含了當前的root節點, db是後端的KV存儲,trie的結構最終都是須要經過KV的形式存儲到數據庫裏面去,而後啓動的時候是須要從數據庫裏面加載的。 originalRoot 啓動加載的時候的hash值,經過這個hash值能夠在數據庫裏面恢復出整顆的trie樹。cachegen字段指示了當前Trie樹的cache時代,每次調用Commit操做的時候,會增長Trie樹的cache時代。 cache時代會被附加在node節點上面,若是當前的cache時代 - cachelimit參數 大於node的cache時代,那麼node會從cache裏面卸載,以便節約內存。 其實這就是緩存更新的LRU算法, 若是一個緩存在多久沒有被使用,那麼就從緩存裏面移除,以節約內存空間。
// Trie is a Merkle Patricia Trie.
// The zero value is an empty trie with no database.
// Use New to create a trie that sits on top of a database.
//
// Trie is not safe for concurrent use.
type Trie struct {
root node
db Database
originalRoot common.Hash
// Cache generation values.
// cachegen increases by one with each commit operation.
// new nodes are tagged with the current generation and unloaded
// when their generation is older than than cachegen-cachelimit.
cachegen, cachelimit uint16
}
###Trie樹的插入,查找和刪除
Trie樹的初始化調用New函數,函數接受一個hash值和一個Database參數,若是hash值不是空值的化,就說明是從數據庫加載一個已經存在的Trie樹, 就調用trei.resolveHash方法來加載整顆Trie樹,這個方法後續會介紹。 若是root是空,那麼就新建一顆Trie樹返回。
func New(root common.Hash, db Database) (*Trie, error) {
trie := &Trie{db: db, originalRoot: root}
if (root != common.Hash{}) && root != emptyRoot {
if db == nil {
panic("trie.New: cannot use existing root without a database")
}
rootnode, err := trie.resolveHash(root[:], nil)
if err != nil {
return nil, err
}
trie.root = rootnode
}
return trie, nil
}
Trie樹的插入,這是一個遞歸調用的方法,從根節點開始,一直往下找,直到找到能夠插入的點,進行插入操做。參數node是當前插入的節點, prefix是當前已經處理完的部分key, key是尚未處理玩的部分key, 完整的key = prefix + key。 value是須要插入的值。 返回值bool是操做是否改變了Trie樹(dirty),node是插入完成後的子樹的根節點, error是錯誤信息。
- 若是節點類型是nil(一顆全新的Trie樹的節點就是nil的),這個時候整顆樹是空的,直接返回shortNode{key, value, t.newFlag()}, 這個時候整顆樹的跟就含有了一個shortNode節點。
- 若是當前的根節點類型是shortNode(也就是葉子節點),首先計算公共前綴,若是公共前綴就等於key,那麼說明這兩個key是同樣的,若是value也同樣的(dirty == false),那麼返回錯誤。 若是沒有錯誤就更新shortNode的值而後返回。若是公共前綴不徹底匹配,那麼就須要把公共前綴提取出來造成一個獨立的節點(擴展節點),擴展節點後面鏈接一個branch節點,branch節點後面看狀況鏈接兩個short節點。首先構建一個branch節點(branch := &fullNode{flags: t.newFlag()}),而後再branch節點的Children位置調用t.insert插入剩下的兩個short節點。這裏有個小細節,key的編碼是HEX encoding,並且末尾帶了一個終結符。考慮咱們的根節點的key是abc0x16,咱們插入的節點的key是ab0x16。下面的branch.Children[key[matchlen]]才能夠正常運行,0x16恰好指向了branch節點的第17個孩子。若是匹配的長度是0,那麼直接返回這個branch節點,不然返回shortNode節點做爲前綴節點。
- 若是當前的節點是fullNode(也就是branch節點),那麼直接往對應的孩子節點調用insert方法,而後把對應的孩子節點只想新生成的節點。
- 若是當前節點是hashNode, hashNode的意思是當前節點尚未加載到內存裏面來,仍是存放在數據庫裏面,那麼首先調用 t.resolveHash(n, prefix)來加載到內存,而後對加載出來的節點調用insert方法來進行插入。
插入代碼
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
if len(key) == 0 {
if v, ok := n.(valueNode); ok {
return !bytes.Equal(v, value.(valueNode)), value, nil
}
return true, value, nil
}
switch n := n.(type) {
case *shortNode:
matchlen := prefixLen(key, n.Key)
// If the whole key matches, keep this short node as is
// and only update the value.
if matchlen == len(n.Key) {
dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
if !dirty || err != nil {
return false, n, err
}
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
// Otherwise branch out at the index where they differ.
branch := &fullNode{flags: t.newFlag()}
var err error
_, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
if err != nil {
return false, nil, err
}
_, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
if err != nil {
return false, nil, err
}
// Replace this shortNode with the branch if it occurs at index 0.
if matchlen == 0 {
return true, branch, nil
}
// Otherwise, replace it with a short node leading up to the branch.
return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil
case *fullNode:
dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
if !dirty || err != nil {
return false, n, err
}
n = n.copy()
n.flags = t.newFlag()
n.Children[key[0]] = nn
return true, n, nil
case nil:
return true, &shortNode{key, value, t.newFlag()}, nil
case hashNode:
// We've hit a part of the trie that isn't loaded yet. Load
// the node and insert into it. This leaves all child nodes on
// the path to the value in the trie.
rn, err := t.resolveHash(n, prefix)
if err != nil {
return false, nil, err
}
dirty, nn, err := t.insert(rn, prefix, key, value)
if !dirty || err != nil {
return false, rn, err
}
return true, nn, nil
default:
panic(fmt.Sprintf("%T: invalid node: %v", n, n))
}
}
Trie樹的Get方法,基本上就是很簡單的遍歷Trie樹,來獲取Key的信息。
func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
switch n := (origNode).(type) {
case nil:
return nil, nil, false, nil
case valueNode:
return n, n, false, nil
case *shortNode:
if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
// key not found in trie
return nil, n, false, nil
}
value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
if err == nil && didResolve {
n = n.copy()
n.Val = newnode
n.flags.gen = t.cachegen
}
return value, n, didResolve, err
case *fullNode:
value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
if err == nil && didResolve {
n = n.copy()
n.flags.gen = t.cachegen
n.Children[key[pos]] = newnode
}
return value, n, didResolve, err
case hashNode:
child, err := t.resolveHash(n, key[:pos])
if err != nil {
return nil, n, true, err
}
value, newnode, _, err := t.tryGet(child, key, pos)
return value, newnode, true, err
default:
panic(fmt.Sprintf("%T: invalid node: %v", origNode, origNode))
}
}
Trie樹的Delete方法,暫時不介紹,代碼根插入比較相似
### Trie樹的序列化和反序列化
序列化主要是指把內存表示的數據存放到數據庫裏面, 反序列化是指把數據庫裏面的Trie數據加載成內存表示的數據。 序列化的目的主要是方便存儲,減小存儲大小等。 反序列化的目的是把存儲的數據加載到內存,方便Trie樹的插入,查詢,修改等需求。
Trie的序列化主要才做用了前面介紹的Compat Encoding和 RLP編碼格式。 序列化的結構在黃皮書裏面有詳細的介紹。
![ image]( picture/trie_8.png)
![ image]( picture/trie_9.png)
![ image]( picture/trie_10.png)
Trie樹的使用方法在trie_test.go裏面有比較詳細的參考。 這裏我列出一個簡單的使用流程。首先建立一個空的Trie樹,而後插入一些數據,最後調用trie.Commit()方法進行序列化並獲得一個hash值(root), 也就是上圖中的KEC(c(J,0))或者是TRIE(J)。
func TestInsert(t *testing.T) {
trie := newEmpty()
updateString(trie, "doe", "reindeer")
updateString(trie, "dog", "puppy")
updateString(trie, "do", "cat")
root, err := trie.Commit()
}
下面咱們來分析下Commit()的主要流程。 通過一系列的調用,最終調用了hasher.go的hash方法。
func (t *Trie) Commit() (root common.Hash, err error) {
if t.db == nil {
panic("Commit called on trie with nil database")
}
return t.CommitTo(t.db)
}
// CommitTo writes all nodes to the given database.
// Nodes are stored with their sha3 hash as the key.
//
// Committing flushes nodes from memory. Subsequent Get calls will
// load nodes from the trie's database. Calling code must ensure that
// the changes made to db are written back to the trie's attached
// database before using the trie.
func (t *Trie) CommitTo(db DatabaseWriter) (root common.Hash, err error) {
hash, cached, err := t.hashRoot(db)
if err != nil {
return (common.Hash{}), err
}
t.root = cached
t.cachegen++
return common.BytesToHash(hash.(hashNode)), nil
}
func (t *Trie) hashRoot(db DatabaseWriter) (node, node, error) {
if t.root == nil {
return hashNode(emptyRoot.Bytes()), nil, nil
}
h := newHasher(t.cachegen, t.cachelimit)
defer returnHasherToPool(h)
return h.hash(t.root, db, true)
}
下面咱們簡單介紹下hash方法,hash方法主要作了兩個操做。 一個是保留原有的樹形結構,並用cache變量中, 另外一個是計算原有樹形結構的hash並把hash值存放到cache變量中保存下來。
計算原有hash值的主要流程是首先調用h.hashChildren(n,db)把全部的子節點的hash值求出來,把原有的子節點替換成子節點的hash值。 這是一個遞歸調用的過程,會從樹葉依次往上計算直到樹根。而後調用store方法計算當前節點的hash值,並把當前節點的hash值放入cache節點,設置dirty參數爲false(新建立的節點的dirty值是爲true的),而後返回。
返回值說明, cache變量包含了原有的node節點,而且包含了node節點的hash值。 hash變量返回了當前節點的hash值(這個值實際上是根據node和node的全部子節點計算出來的)。
有一個小細節: 根節點調用hash函數的時候, force參數是爲true的,其餘的子節點調用的時候force參數是爲false的。 force參數的用途是當||c(J,i)||<32的時候也對c(J,i)進行hash計算,這樣保證不管如何也會對根節點進行Hash計算。
// hash collapses a node down into a hash node, also returning a copy of the
// original node initialized with the computed hash to replace the original one.
func (h *hasher) hash(n node, db DatabaseWriter, force bool) (node, node, error) {
// If we're not storing the node, just hashing, use available cached data
if hash, dirty := n.cache(); hash != nil {
if db == nil {
return hash, n, nil
}
if n.canUnload(h.cachegen, h.cachelimit) {
// Unload the node from cache. All of its subnodes will have a lower or equal
// cache generation number.
cacheUnloadCounter.Inc(1)
return hash, hash, nil
}
if !dirty {
return hash, n, nil
}
}
// Trie not processed yet or needs storage, walk the children
collapsed, cached, err := h.hashChildren(n, db)
if err != nil {
return hashNode{}, n, err
}
hashed, err := h.store(collapsed, db, force)
if err != nil {
return hashNode{}, n, err
}
// Cache the hash of the node for later reuse and remove
// the dirty flag in commit mode. It's fine to assign these values directly
// without copying the node first because hashChildren copies it.
cachedHash, _ := hashed.(hashNode)
switch cn := cached.(type) {
case *shortNode:
cn.flags.hash = cachedHash
if db != nil {
cn.flags.dirty = false
}
case *fullNode:
cn.flags.hash = cachedHash
if db != nil {
cn.flags.dirty = false
}
}
return hashed, cached, nil
}
hashChildren方法,這個方法把全部的子節點替換成他們的hash,能夠看到cache變量接管了原來的Trie樹的完整結構,collapsed變量把子節點替換成子節點的hash值。
- 若是當前節點是shortNode, 首先把collapsed.Key從Hex Encoding 替換成 Compact Encoding, 而後遞歸調用hash方法計算子節點的hash和cache,這樣就把子節點替換成了子節點的hash值,
- 若是當前節點是fullNode, 那麼遍歷每一個子節點,把子節點替換成子節點的Hash值,
- 不然的化這個節點沒有children。直接返回。
代碼
// hashChildren replaces the children of a node with their hashes if the encoded
// size of the child is larger than a hash, returning the collapsed node as well
// as a replacement for the original node with the child hashes cached in.
func (h *hasher) hashChildren(original node, db DatabaseWriter) (node, node, error) {
var err error
switch n := original.(type) {
case *shortNode:
// Hash the short node's child, caching the newly hashed subtree
collapsed, cached := n.copy(), n.copy()
collapsed.Key = hexToCompact(n.Key)
cached.Key = common.CopyBytes(n.Key)
if _, ok := n.Val.(valueNode); !ok {
collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
if err != nil {
return original, original, err
}
}
if collapsed.Val == nil {
collapsed.Val = valueNode(nil) // Ensure that nil children are encoded as empty strings.
}
return collapsed, cached, nil
case *fullNode:
// Hash the full node's children, caching the newly hashed subtrees
collapsed, cached := n.copy(), n.copy()
for i := 0; i < 16; i++ {
if n.Children[i] != nil {
collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
if err != nil {
return original, original, err
}
} else {
collapsed.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings.
}
}
cached.Children[16] = n.Children[16]
if collapsed.Children[16] == nil {
collapsed.Children[16] = valueNode(nil)
}
return collapsed, cached, nil
default:
// Value and hash nodes don't have children so they're left as were
return n, original, nil
}
}
store方法,若是一個node的全部子節點都替換成了子節點的hash值,那麼直接調用rlp.Encode方法對這個節點進行編碼,若是編碼後的值小於32,而且這個節點不是根節點,那麼就把他們直接存儲在他們的父節點裏面,否者調用h.sha.Write方法進行hash計算, 而後把hash值和編碼後的數據存儲到數據庫裏面,而後返回hash值。
能夠看到每一個值大於32的節點的值和hash都存儲到了數據庫裏面,
func (h *hasher) store(n node, db DatabaseWriter, force bool) (node, error) {
// Don't store hashes or empty nodes.
if _, isHash := n.(hashNode); n == nil || isHash {
return n, nil
}
// Generate the RLP encoding of the node
h.tmp.Reset()
if err := rlp.Encode(h.tmp, n); err != nil {
panic("encode error: " + err.Error())
}
if h.tmp.Len() < 32 && !force {
return n, nil // Nodes smaller than 32 bytes are stored inside their parent
}
// Larger nodes are replaced by their hash and stored in the database.
hash, _ := n.cache()
if hash == nil {
h.sha.Reset()
h.sha.Write(h.tmp.Bytes())
hash = hashNode(h.sha.Sum(nil))
}
if db != nil {
return hash, db.Put(hash, h.tmp.Bytes())
}
return hash, nil
}
Trie的反序列化過程。還記得以前建立Trie樹的流程麼。 若是參數root的hash值不爲空,那麼就會調用rootnode, err := trie.resolveHash(root[:], nil) 方法來獲得rootnode節點。 首先從數據庫裏面經過hash值獲取節點的RLP編碼後的內容。 而後調用decodeNode來解析內容。
func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) {
cacheMissCounter.Inc(1)
enc, err := t.db.Get(n)
if err != nil || enc == nil {
return nil, &MissingNodeError{NodeHash: common.BytesToHash(n), Path: prefix}
}
dec := mustDecodeNode(n, enc, t.cachegen)
return dec, nil
}
func mustDecodeNode(hash, buf []byte, cachegen uint16) node {
n, err := decodeNode(hash, buf, cachegen)
if err != nil {
panic(fmt.Sprintf("node %x: %v", hash, err))
}
return n
}
decodeNode方法,這個方法根據rlp的list的長度來判斷這個編碼到底屬於什麼節點,若是是2個字段那麼就是shortNode節點,若是是17個字段,那麼就是fullNode,而後分別調用各自的解析函數。
// decodeNode parses the RLP encoding of a trie node.
func decodeNode(hash, buf []byte, cachegen uint16) (node, error) {
if len(buf) == 0 {
return nil, io.ErrUnexpectedEOF
}
elems, _, err := rlp.SplitList(buf)
if err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
switch c, _ := rlp.CountValues(elems); c {
case 2:
n, err := decodeShort(hash, buf, elems, cachegen)
return n, wrapError(err, "short")
case 17:
n, err := decodeFull(hash, buf, elems, cachegen)
return n, wrapError(err, "full")
default:
return nil, fmt.Errorf("invalid number of list elements: %v", c)
}
}
decodeShort方法,經過key是否有終結符號來判斷究竟是葉子節點仍是中間節點。若是有終結符那麼就是葉子結點,經過SplitString方法解析出來val而後生成一個shortNode。 不過沒有終結符,那麼說明是擴展節點, 經過decodeRef來解析剩下的節點,而後生成一個shortNode。
func decodeShort(hash, buf, elems []byte, cachegen uint16) (node, error) {
kbuf, rest, err := rlp.SplitString(elems)
if err != nil {
return nil, err
}
flag := nodeFlag{hash: hash, gen: cachegen}
key := compactToHex(kbuf)
if hasTerm(key) {
// value node
val, _, err := rlp.SplitString(rest)
if err != nil {
return nil, fmt.Errorf("invalid value node: %v", err)
}
return &shortNode{key, append(valueNode{}, val...), flag}, nil
}
r, _, err := decodeRef(rest, cachegen)
if err != nil {
return nil, wrapError(err, "val")
}
return &shortNode{key, r, flag}, nil
}
decodeRef方法根據數據類型進行解析,若是類型是list,那麼有多是內容<32的值,那麼調用decodeNode進行解析。 若是是空節點,那麼返回空,若是是hash值,那麼構造一個hashNode進行返回,注意的是這裏沒有繼續進行解析,若是須要繼續解析這個hashNode,那麼須要繼續調用resolveHash方法。 到這裏decodeShort方法就調用完成了。
func decodeRef(buf []byte, cachegen uint16) (node, []byte, error) {
kind, val, rest, err := rlp.Split(buf)
if err != nil {
return nil, buf, err
}
switch {
case kind == rlp.List:
// 'embedded' node reference. The encoding must be smaller
// than a hash in order to be valid.
if size := len(buf) - len(rest); size > hashLen {
err := fmt.Errorf("oversized embedded node (size is %d bytes, want size < %d)", size, hashLen)
return nil, buf, err
}
n, err := decodeNode(nil, buf, cachegen)
return n, rest, err
case kind == rlp.String && len(val) == 0:
// empty node
return nil, rest, nil
case kind == rlp.String && len(val) == 32:
return append(hashNode{}, val...), rest, nil
default:
return nil, nil, fmt.Errorf("invalid RLP string size %d (want 0 or 32)", len(val))
}
}
decodeFull方法。根decodeShort方法的流程差很少。
func decodeFull(hash, buf, elems []byte, cachegen uint16) (*fullNode, error) {
n := &fullNode{flags: nodeFlag{hash: hash, gen: cachegen}}
for i := 0; i < 16; i++ {
cld, rest, err := decodeRef(elems, cachegen)
if err != nil {
return n, wrapError(err, fmt.Sprintf("[%d]", i))
}
n.Children[i], elems = cld, rest
}
val, _, err := rlp.SplitString(elems)
if err != nil {
return n, err
}
if len(val) > 0 {
n.Children[16] = append(valueNode{}, val...)
}
return n, nil
}
### Trie樹的cache管理
Trie樹的cache管理。 還記得Trie樹的結構裏面有兩個參數, 一個是cachegen,一個是cachelimit。這兩個參數就是cache控制的參數。 Trie樹每一次調用Commit方法,會致使當前的cachegen增長1。
func (t *Trie) CommitTo(db DatabaseWriter) (root common.Hash, err error) {
hash, cached, err := t.hashRoot(db)
if err != nil {
return (common.Hash{}), err
}
t.root = cached
t.cachegen++
return common.BytesToHash(hash.(hashNode)), nil
}
而後在Trie樹插入的時候,會把當前的cachegen存放到節點中。
func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
....
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
// newFlag returns the cache flag value for a newly created node.
func (t *Trie) newFlag() nodeFlag {
return nodeFlag{dirty: true, gen: t.cachegen}
}
若是 trie.cachegen - node.cachegen > cachelimit,就能夠把節點從內存裏面卸載掉。 也就是說節點通過幾回Commit,都沒有修改,那麼就把節點從內存裏面卸載,以便節約內存給其餘節點使用。
卸載過程在咱們的 hasher.hash方法中, 這個方法是在commit的時候調用。若是方法的canUnload方法調用返回真,那麼就卸載節點,觀察他的返回值,只返回了hash節點,而沒有返回node節點,這樣節點就沒有引用,不久就會被gc清除掉。 節點被卸載掉以後,會用一個hashNode節點來表示這個節點以及其子節點。 若是後續須要使用,能夠經過方法把這個節點加載到內存裏面來。
func (h *hasher) hash(n node, db DatabaseWriter, force bool) (node, node, error) {
if hash, dirty := n.cache(); hash != nil {
if db == nil {
return hash, n, nil
}
if n.canUnload(h.cachegen, h.cachelimit) {
// Unload the node from cache. All of its subnodes will have a lower or equal
// cache generation number.
cacheUnloadCounter.Inc(1)
return hash, hash, nil
}
if !dirty {
return hash, n, nil
}
}
canUnload方法是一個接口,不一樣的node調用不一樣的方法。
// canUnload tells whether a node can be unloaded.
func (n *nodeFlag) canUnload(cachegen, cachelimit uint16) bool {
return !n.dirty && cachegen-n.gen >= cachelimit
}
func (n *fullNode) canUnload(gen, limit uint16) bool { return n.flags.canUnload(gen, limit) }
func (n *shortNode) canUnload(gen, limit uint16) bool { return n.flags.canUnload(gen, limit) }
func (n hashNode) canUnload(uint16, uint16) bool { return false }
func (n valueNode) canUnload(uint16, uint16) bool { return false }
func (n *fullNode) cache() (hashNode, bool) { return n.flags.hash, n.flags.dirty }
func (n *shortNode) cache() (hashNode, bool) { return n.flags.hash, n.flags.dirty }
func (n hashNode) cache() (hashNode, bool) { return nil, true }
func (n valueNode) cache() (hashNode, bool) { return nil, true }
### proof.go Trie樹的默克爾證實
主要提供兩個方法,Prove方法獲取指定Key的proof證實, proof證實是從根節點到葉子節點的全部節點的hash值列表。 VerifyProof方法,接受一個roothash值和proof證實和key來驗證key是否存在。
Prove方法,從根節點開始。把通過的節點的hash值一個一個存入到list中。而後返回。
// Prove constructs a merkle proof for key. The result contains all
// encoded nodes on the path to the value at key. The value itself is
// also included in the last node and can be retrieved by verifying
// the proof.
//
// If the trie does not contain a value for key, the returned proof
// contains all nodes of the longest existing prefix of the key
// (at least the root node), ending with the node that proves the
// absence of the key.
func (t *Trie) Prove(key []byte) []rlp.RawValue {
// Collect all nodes on the path to key.
key = keybytesToHex(key)
nodes := []node{}
tn := t.root
for len(key) > 0 && tn != nil {
switch n := tn.(type) {
case *shortNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
// The trie doesn't contain the key.
tn = nil
} else {
tn = n.Val
key = key[len(n.Key):]
}
nodes = append(nodes, n)
case *fullNode:
tn = n.Children[key[0]]
key = key[1:]
nodes = append(nodes, n)
case hashNode:
var err error
tn, err = t.resolveHash(n, nil)
if err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
return nil
}
default:
panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
}
}
hasher := newHasher(0, 0)
proof := make([]rlp.RawValue, 0, len(nodes))
for i, n := range nodes {
// Don't bother checking for errors here since hasher panics
// if encoding doesn't work and we're not writing to any database.
n, _, _ = hasher.hashChildren(n, nil)
hn, _ := hasher.store(n, nil, false)
if _, ok := hn.(hashNode); ok || i == 0 {
// If the node's database encoding is a hash (or is the
// root node), it becomes a proof element.
enc, _ := rlp.EncodeToBytes(n)
proof = append(proof, enc)
}
}
return proof
}
VerifyProof方法,接收一個rootHash參數,key參數,和proof數組, 來一個一個驗證是否可以和數據庫裏面的可以對應上。
// VerifyProof checks merkle proofs. The given proof must contain the
// value for key in a trie with the given root hash. VerifyProof
// returns an error if the proof contains invalid trie nodes or the
// wrong value.
func VerifyProof(rootHash common.Hash, key []byte, proof []rlp.RawValue) (value []byte, err error) {
key = keybytesToHex(key)
sha := sha3.NewKeccak256()
wantHash := rootHash.Bytes()
for i, buf := range proof {
sha.Reset()
sha.Write(buf)
if !bytes.Equal(sha.Sum(nil), wantHash) {
return nil, fmt.Errorf("bad proof node %d: hash mismatch", i)
}
n, err := decodeNode(wantHash, buf, 0)
if err != nil {
return nil, fmt.Errorf("bad proof node %d: %v", i, err)
}
keyrest, cld := get(n, key)
switch cld := cld.(type) {
case nil:
if i != len(proof)-1 {
return nil, fmt.Errorf("key mismatch at proof node %d", i)
} else {
// The trie doesn't contain the key.
return nil, nil
}
case hashNode:
key = keyrest
wantHash = cld
case valueNode:
if i != len(proof)-1 {
return nil, errors.New("additional nodes at end of proof")
}
return cld, nil
}
}
return nil, errors.New("unexpected end of proof")
}
func get(tn node, key []byte) ([]byte, node) {
for {
switch n := tn.(type) {
case *shortNode:
if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) {
return nil, nil
}
tn = n.Val
key = key[len(n.Key):]
case *fullNode:
tn = n.Children[key[0]]
key = key[1:]
case hashNode:
return key, n
case nil:
return key, nil
case valueNode:
return nil, n
default:
panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
}
}
}
### security_trie.go 加密的Trie
爲了不刻意使用很長的key致使訪問時間的增長, security_trie包裝了一下trie樹, 全部的key都轉換成keccak256算法計算的hash值。同時在數據庫裏面存儲hash值對應的原始的key。
type SecureTrie struct {
trie Trie //原始的Trie樹
hashKeyBuf [secureKeyLength]byte //計算hash值的buf
secKeyBuf [200]byte //hash值對應的key存儲的時候的數據庫前綴
secKeyCache map[string][]byte //記錄hash值和對應的key的映射
secKeyCacheOwner *SecureTrie // Pointer to self, replace the key cache on mismatch
}
func NewSecure(root common.Hash, db Database, cachelimit uint16) (*SecureTrie, error) {
if db == nil {
panic("NewSecure called with nil database")
}
trie, err := New(root, db)
if err != nil {
return nil, err
}
trie.SetCacheLimit(cachelimit)
return &SecureTrie{trie: *trie}, nil
}
// Get returns the value for key stored in the trie.
// The value bytes must not be modified by the caller.
func (t *SecureTrie) Get(key []byte) []byte {
res, err := t.TryGet(key)
if err != nil {
log.Error(fmt.Sprintf("Unhandled trie error: %v", err))
}
return res
}
// TryGet returns the value for key stored in the trie.
// The value bytes must not be modified by the caller.
// If a node was not found in the database, a MissingNodeError is returned.
func (t *SecureTrie) TryGet(key []byte) ([]byte, error) {
return t.trie.TryGet(t.hashKey(key))
}
func (t *SecureTrie) CommitTo(db DatabaseWriter) (root common.Hash, err error) {
if len(t.getSecKeyCache()) > 0 {
for hk, key := range t.secKeyCache {
if err := db.Put(t.secKey([]byte(hk)), key); err != nil {
return common.Hash{}, err
}
}
t.secKeyCache = make(map[string][]byte)
}
return t.trie.CommitTo(db)
}