以太坊源碼分析(36)ethdb源碼分析

go-ethereum全部的數據存儲在levelDB這個Google開源的KeyValue文件數據庫中,整個區塊鏈的全部數據都存儲在一個levelDB的數據庫中,levelDB支持按照文件大小切分文件的功能,因此咱們看到的區塊鏈的數據都是一個一個小文件,其實這些小文件都是一個同一個levelDB實例。這裏簡單的看下levelDB的go封裝代碼。

levelDB官方網站介紹的特色

**特色**

- key和value都是任意長度的字節數組;
- entry(即一條K-V記錄)默認是按照key的字典順序存儲的,固然開發者也能夠重載這個排序函數;
- 提供的基本操做接口:Put()、Delete()、Get()、Batch();
- 支持批量操做以原子操做進行;
- 能夠建立數據全景的snapshot(快照),並容許在快照中查找數據;
- 能夠經過前向(或後向)迭代器遍歷數據(迭代器會隱含的建立一個snapshot);
- 自動使用Snappy壓縮數據;
- 可移植性;

**限制**

- 非關係型數據模型(NoSQL),不支持sql語句,也不支持索引;
- 一次只容許一個進程訪問一個特定的數據庫;
- 沒有內置的C/S架構,但開發者可使用LevelDB庫本身封裝一個server;


源碼所在的目錄在ethereum/ethdb目錄。代碼比較簡單, 分爲下面三個文件

- database.go levelDB的封裝代碼
- memory_database.go         供測試用的基於內存的數據庫,不會持久化爲文件,僅供測試
- interface.go               定義了數據庫的接口
- database_test.go           測試案例

## interface.go
看下面的代碼,基本上定義了KeyValue數據庫的基本操做, Put, Get, Has, Delete等基本操做,levelDB是不支持SQL的,基本能夠理解爲數據結構裏面的Map。

    package ethdb
    const IdealBatchSize = 100 * 1024
    
    // Putter wraps the database write operation supported by both batches and regular databases.
    //Putter接口定義了批量操做和普通操做的寫入接口
    type Putter interface {
        Put(key []byte, value []byte) error
    }
    
    // Database wraps all database operations. All methods are safe for concurrent use.
    //數據庫接口定義了全部的數據庫操做, 全部的方法都是多線程安全的。
    type Database interface {
        Putter
        Get(key []byte) ([]byte, error)
        Has(key []byte) (bool, error)
        Delete(key []byte) error
        Close()
        NewBatch() Batch
    }
    
    // Batch is a write-only database that commits changes to its host database
    // when Write is called. Batch cannot be used concurrently.
    //批量操做接口,不能多線程同時使用,當Write方法被調用的時候,數據庫會提交寫入的更改。
    type Batch interface {
        Putter
        ValueSize() int // amount of data in the batch
        Write() error
    }

## memory_database.g
這個基本上就是封裝了一個內存的Map結構。而後使用了一把鎖來對多線程進行資源的保護。

    type MemDatabase struct {
        db map[string][]byte
        lock sync.RWMutex
    }
    
    func NewMemDatabase() (*MemDatabase, error) {
        return &MemDatabase{
            db: make(map[string][]byte),
        }, nil
    }
    
    func (db *MemDatabase) Put(key []byte, value []byte) error {
        db.lock.Lock()
        defer db.lock.Unlock()
        db.db[string(key)] = common.CopyBytes(value)
        return nil
    }
    func (db *MemDatabase) Has(key []byte) (bool, error) {
        db.lock.RLock()
        defer db.lock.RUnlock()
    
        _, ok := db.db[string(key)]
        return ok, nil
    }

而後是Batch的操做。也比較簡單,一看便明白。
    
    
    type kv struct{ k, v []byte }
    type memBatch struct {
        db *MemDatabase
        writes []kv
        size int
    }
    func (b *memBatch) Put(key, value []byte) error {
        b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value)})
        b.size += len(value)
        return nil
    }
    func (b *memBatch) Write() error {
        b.db.lock.Lock()
        defer b.db.lock.Unlock()
    
        for _, kv := range b.writes {
            b.db.db[string(kv.k)] = kv.v
        }
        return nil
    }


##database.go
這個就是實際ethereum客戶端使用的代碼, 封裝了levelDB的接口。

    
    import (
        "strconv"
        "strings"
        "sync"
        "time"
    
        "github.com/ethereum/go-ethereum/log"
        "github.com/ethereum/go-ethereum/metrics"
        "github.com/syndtr/goleveldb/leveldb"
        "github.com/syndtr/goleveldb/leveldb/errors"
        "github.com/syndtr/goleveldb/leveldb/filter"
        "github.com/syndtr/goleveldb/leveldb/iterator"
        "github.com/syndtr/goleveldb/leveldb/opt"
        gometrics "github.com/rcrowley/go-metrics"
    )

使用了github.com/syndtr/goleveldb/leveldb的leveldb的封裝,因此一些使用的文檔能夠在那裏找到。能夠看到,數據結構主要增長了不少的Mertrics用來記錄數據庫的使用狀況,增長了quitChan用來處理中止時候的一些狀況,這個後面會分析。若是下面代碼可能有疑問的地方應該再Filter: filter.NewBloomFilter(10)這個能夠暫時不用關注,這個是levelDB裏面用來進行性能優化的一個選項,能夠不用理會。

    
    type LDBDatabase struct {
        fn string // filename for reporting
        db *leveldb.DB // LevelDB instance
    
        getTimer gometrics.Timer // Timer for measuring the database get request counts and latencies
        putTimer gometrics.Timer // Timer for measuring the database put request counts and latencies
        ...metrics
    
        quitLock sync.Mutex // Mutex protecting the quit channel access
        quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
    
        log log.Logger // Contextual logger tracking the database path
    }
    
    // NewLDBDatabase returns a LevelDB wrapped object.
    func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
        logger := log.New("database", file)
        // Ensure we have some minimal caching and file guarantees
        if cache < 16 {
            cache = 16
        }
        if handles < 16 {
            handles = 16
        }
        logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)
        // Open the db and recover any potential corruptions
        db, err := leveldb.OpenFile(file, &opt.Options{
            OpenFilesCacheCapacity: handles,
            BlockCacheCapacity: cache / 2 * opt.MiB,
            WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally
            Filter: filter.NewBloomFilter(10),
        })
        if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
            db, err = leveldb.RecoverFile(file, nil)
        }
        // (Re)check for errors and abort if opening of the db failed
        if err != nil {
            return nil, err
        }
        return &LDBDatabase{
            fn: file,
            db: db,
            log: logger,
        }, nil
    }


再看看下面的Put和Has的代碼,由於github.com/syndtr/goleveldb/leveldb封裝以後的代碼是支持多線程同時訪問的,因此下面這些代碼是不用使用鎖來保護的,這個能夠注意一下。這裏面大部分的代碼都是直接調用leveldb的封裝,因此不詳細介紹了。 有一個比較有意思的地方是Metrics代碼。

    // Put puts the given key / value to the queue
    func (db *LDBDatabase) Put(key []byte, value []byte) error {
        // Measure the database put latency, if requested
        if db.putTimer != nil {
            defer db.putTimer.UpdateSince(time.Now())
        }
        // Generate the data to write to disk, update the meter and write
        //value = rle.Compress(value)
    
        if db.writeMeter != nil {
            db.writeMeter.Mark(int64(len(value)))
        }
        return db.db.Put(key, value, nil)
    }
    
    func (db *LDBDatabase) Has(key []byte) (bool, error) {
        return db.db.Has(key, nil)
    }

###Metrics的處理
以前在建立NewLDBDatabase的時候,並無初始化內部的不少Mertrics,這個時候Mertrics是爲nil的。初始化Mertrics是在Meter方法中。外部傳入了一個prefix參數,而後建立了各類Mertrics(具體如何建立Merter,會後續在Meter專題進行分析),而後建立了quitChan。 最後啓動了一個線程調用了db.meter方法。
    
    // Meter configures the database metrics collectors and
    func (db *LDBDatabase) Meter(prefix string) {
        // Short circuit metering if the metrics system is disabled
        if !metrics.Enabled {
            return
        }
        // Initialize all the metrics collector at the requested prefix
        db.getTimer = metrics.NewTimer(prefix + "user/gets")
        db.putTimer = metrics.NewTimer(prefix + "user/puts")
        db.delTimer = metrics.NewTimer(prefix + "user/dels")
        db.missMeter = metrics.NewMeter(prefix + "user/misses")
        db.readMeter = metrics.NewMeter(prefix + "user/reads")
        db.writeMeter = metrics.NewMeter(prefix + "user/writes")
        db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
        db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
        db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
    
        // Create a quit channel for the periodic collector and run it
        db.quitLock.Lock()
        db.quitChan = make(chan chan error)
        db.quitLock.Unlock()
    
        go db.meter(3 * time.Second)
    }

這個方法每3秒鐘獲取一次leveldb內部的計數器,而後把他們公佈到metrics子系統。 這是一個無限循環的方法, 直到quitChan收到了一個退出信號。

    // meter periodically retrieves internal leveldb counters and reports them to
    // the metrics subsystem.
    // This is how a stats table look like (currently):
    //下面的註釋就是咱們調用 db.db.GetProperty("leveldb.stats")返回的字符串,後續的代碼須要解析這個字符串並把信息寫入到Meter中。

    // Compactions
    // Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
    // -------+------------+---------------+---------------+---------------+---------------
    // 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
    // 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
    // 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
    // 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
    
    func (db *LDBDatabase) meter(refresh time.Duration) {
        // Create the counters to store current and previous values
        counters := make([][]float64, 2)
        for i := 0; i < 2; i++ {
            counters[i] = make([]float64, 3)
        }
        // Iterate ad infinitum and collect the stats
        for i := 1; ; i++ {
            // Retrieve the database stats
            stats, err := db.db.GetProperty("leveldb.stats")
            if err != nil {
                db.log.Error("Failed to read database stats", "err", err)
                return
            }
            // Find the compaction table, skip the header
            lines := strings.Split(stats, "\n")
            for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
                lines = lines[1:]
            }
            if len(lines) <= 3 {
                db.log.Error("Compaction table not found")
                return
            }
            lines = lines[3:]
    
            // Iterate over all the table rows, and accumulate the entries
            for j := 0; j < len(counters[i%2]); j++ {
                counters[i%2][j] = 0
            }
            for _, line := range lines {
                parts := strings.Split(line, "|")
                if len(parts) != 6 {
                    break
                }
                for idx, counter := range parts[3:] {
                    value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
                    if err != nil {
                        db.log.Error("Compaction entry parsing failed", "err", err)
                        return
                    }
                    counters[i%2][idx] += value
                }
            }
            // Update all the requested meters
            if db.compTimeMeter != nil {
                db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
            }
            if db.compReadMeter != nil {
                db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
            }
            if db.compWriteMeter != nil {
                db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
            }
            // Sleep a bit, then repeat the stats collection
            select {
            case errc := <-db.quitChan:
                // Quit requesting, stop hammering the database
                errc <- nil
                return
    
            case <-time.After(refresh):
                // Timeout, gather a new set of stats
            }
        }
    }




網址:http://www.qukuailianxueyuan.io/
git



欲領取造幣技術與全套虛擬機資料github

區塊鏈技術交流QQ羣:756146052  備註:CSDNsql

尹成學院微信:備註:CSDN數據庫

相關文章
相關標籤/搜索