交易池:txpool源碼分析

交易池:txpool源碼分析

交易池的源碼位於:core/tx_pool.go文件網絡

txpool交易池由兩部分構成分別是pending和queued組成。主要適用於存放當前提交等待被區塊確認提交的交易,本地交易和網絡遠程交易都有數據結構

一、pending:等待執行的交易會被放在pending隊列中oop

二、queued:提交可是不可以執行的交易,放在queue中等待執行源碼分析

經過閱讀tx_pool_test.go這個txpool的測試文件源碼能夠發現txpool主要功能以下:

一、檢查交易的信息數據是否合法,包括Gas,餘額是否不足,Nonce大小等等區塊鏈

二、檢查時間狀態,將nonce太高的交易放在queue隊列,將能夠執行的交易放在 pending隊列測試

三、在資源有限的狀況下(例如當前池滿了,或者網絡擁堵),會優先執行GasPrice高的交易ui

四、若是當前交易的額度大於當前帳戶的額度,交易會被刪除spa

五、對於相同的account對應的相同nonce的交易只會保存GasPrice高的哪一個交易日誌

六、本地交易會使用journal的功能將信息存在本地磁盤code

七、若是account沒有餘額了,那麼對應queue隊列和pending隊列中的交易會被刪除

txpool的數據結構

type TxPool struct {
    //配置信息
   config       TxPoolConfig
    //鏈配置
   chainconfig  *params.ChainConfig
    //當前的鏈
   chain        blockChain
    //最低的gas價格
   gasPrice     *big.Int
    //經過txFedd訂閱TxPool的消息
   txFeed       event.Feed
    //提供了同時取消多個訂閱的功能
   scope        event.SubscriptionScope
    //當有了新的區塊的產生會收到消息,訂閱區塊頭消息
   chainHeadCh  chan ChainHeadEvent
    //區塊頭消息訂閱器
   chainHeadSub event.Subscription
    //對事物進行簽名處理
   signer       types.Signer
    //讀寫互斥鎖
   mu           sync.RWMutex
	//當前區塊鏈頭部的狀態
   currentState  *state.StateDB
    //掛起狀態跟蹤虛擬nonces
   pendingState  *state.ManagedState 
    // 目前交易的費用上限
   currentMaxGas uint64              
	//一套豁免驅逐規則的本地交易
   locals  *accountSet 
    //本地事務日誌備份到磁盤
   journal *txJournal 
	//等待隊列
   pending map[common.Address]*txList  
    //排隊但不可處理的事務
   queue   map[common.Address]*txList   
    //每一個已知賬戶的最後一次心跳
   beats   map[common.Address]time.Time 
    //全部容許查詢的事務
   all     *txLookup   
    //全部按價格排序的交易
   priced  *txPricedList                
	//關閉同步
   wg sync.WaitGroup 
	//家園版本??
   homestead bool
}
複製代碼

TxPool的配置信息包括一下

type TxPoolConfig struct {
    // 默認狀況下應視爲本地的地址
   Locals    []common.Address 
    // 是否應該禁用本地事務處理
   NoLocals  bool  
    // 本地事務日誌,以便在節點從新啓動時存活
   Journal   string  
    // 從新生成本地事務日誌的時間間隔
   Rejournal time.Duration 
    // 最低gas價格,以接受入池
   PriceLimit uint64
    // 更換//現有交易的最低價格增幅(一次) 
   PriceBump  uint64 
	//每一個賬戶保證的可執行事務槽數
   AccountSlots uint64 
    //全部賬戶的可執行事務槽的最大數量
   GlobalSlots  uint64 
    //每一個賬戶容許的非可執行事務槽的最大數量
   AccountQueue uint64 
    //全部賬戶的非可執行事務槽的最大數量
   GlobalQueue  uint64
	//非可執行事務的最大排隊時間
   Lifetime time.Duration 
}
複製代碼

NewTxPool() 構建

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
   //對輸入進行消毒,確保不會設定易受影響的自然氣價格
   // sanitize檢查所提供的用戶配置,並更改任何不合理或不可行的配置
    config = (&config).sanitize()
	//建立帶有初始設置的交易池
   // Create the transaction pool with its initial settings
   pool := &TxPool{
      config:      config,
      chainconfig: chainconfig,
      chain:       chain,
      signer:      types.NewEIP155Signer(chainconfig.ChainID),
      pending:     make(map[common.Address]*txList),
      queue:       make(map[common.Address]*txList),
      beats:       make(map[common.Address]time.Time),
      all:         newTxLookup(),
      chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
      gasPrice:    new(big.Int).SetUint64(config.PriceLimit),
   }
   pool.locals = newAccountSet(pool.signer)
   for _, addr := range config.Locals {
      log.Info("Setting new local account", "address", addr)
      pool.locals.add(addr)
   }
   pool.priced = newTxPricedList(pool.all)
   pool.reset(nil, chain.CurrentBlock().Header())
	////若是啓用了本地事務和日誌記錄,則從磁盤加載
   // If local transactions and journaling is enabled, load from disk
   if !config.NoLocals && config.Journal != "" {
      pool.journal = newTxJournal(config.Journal)

      if err := pool.journal.load(pool.AddLocals); err != nil {
         log.Warn("Failed to load transaction journal", "err", err)
      }
      if err := pool.journal.rotate(pool.local()); err != nil {
         log.Warn("Failed to rotate transaction journal", "err", err)
      }
   }
   // 從區塊鏈訂閱事件
   pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)

   // 啓動事件循環並返回
   pool.wg.Add(1)
   go pool.loop()

   return pool
}
複製代碼

add() 方法

驗證交易並將其插入到future queue. 若是這個交易是替換了當前存在的某個交易,那麼會返回以前的那個交易,這樣外部就不用調用promote方法. 若是某個新增長的交易被標記爲local, 那麼它的發送帳戶會進入白名單,這個帳戶的關聯的交易將不會由於價格的限制或者其餘的一些限制被刪除

func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
    //若是這個交易已經知道,就丟棄他
   hash := tx.Hash()
   if pool.all.Get(hash) != nil {
      log.Trace("Discarding already known transaction", "hash", hash)
      return false, fmt.Errorf("known transaction: %x", hash)
   }
   // 若是交易不能經過基本數據驗證,就丟棄它
   if err := pool.validateTx(tx, local); err != nil {
      log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
      invalidTxCounter.Inc(1)
      return false, err
   }
   // 若是交易池已經滿了,就丟棄交易費用低的交易
   if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
      // 若是新的交易費用交易太低就不接受
      if !local && pool.priced.Underpriced(tx, pool.locals) {
         log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
         underpricedTxCounter.Inc(1)
         return false, ErrUnderpriced
      }
      // 若是新的交易比舊的交易號好,就添加新的交易
      drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
      for _, tx := range drop {
         log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
         underpricedTxCounter.Inc(1)
         pool.removeTx(tx.Hash(), false)
      }
   }
   // 若是交易正在替換已經掛起的交易,請直接執行
   from, _ := types.Sender(pool.signer, tx) // already validated
   if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
      // 一旦已經掛起,檢查是否知足要求的價格上漲
      inserted, old := list.Add(tx, pool.config.PriceBump)
      if !inserted {
         pendingDiscardCounter.Inc(1)
         return false, ErrReplaceUnderpriced
      }
      // 新交易更好,替換掉舊交易
      if old != nil {
         pool.all.Remove(old.Hash())
         pool.priced.Removed()
         pendingReplaceCounter.Inc(1)
      }
      pool.all.Add(tx)
      pool.priced.Put(tx)
      pool.journalTx(from, tx)

      log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

      // 咱們直接注入了一個新的交易,通知子系統
      go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})

      return old != nil, nil
   }
   // New transaction isn't replacing a pending one, push into queue
   replace, err := pool.enqueueTx(hash, tx)
   if err != nil {
      return false, err
   }
   // 標記本地地址並記錄本地交易
   if local {
      if !pool.locals.contains(from) {
         log.Info("Setting new local account", "address", from)
         pool.locals.add(from)
      }
   }
   pool.journalTx(from, tx)

   log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
   return replace, nil
}
複製代碼

validateTx() 方法

使用一致性規則來檢查一個交易是否有效,並採用本地節點的一些啓發式的限制

func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
   // 拒絕超過32KB的事務,以防止DOSS攻擊
   if tx.Size() > 32*1024 {
      return ErrOversizedData
   }
   // 交易不能是負的。這可能永遠不會發生使用RLP解碼
   // 但若是使用RPC建立交易,則可能發生交易。
   if tx.Value().Sign() < 0 {
      return ErrNegativeValue
   }
   // 確保交易使用的Gas不超過當前塊限制的GasLimit
   if pool.currentMaxGas < tx.Gas() {
      return ErrGasLimit
   }
   // 確保交易簽名正確
   from, err := types.Sender(pool.signer, tx)
   if err != nil {
      return ErrInvalidSender
   }
   // 在咱們本身的最低接受Gas價格下放棄非本地交易
   local = local || pool.locals.contains(from) 
    //即便交易從網絡到達,賬戶也多是本地的
   if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
      return ErrUnderpriced
   }
   // 確保交易符合即時的nonce 也就是當前的交易的nonce必需要等與當前帳戶的 //nonce
   if pool.currentState.GetNonce(from) > tx.Nonce() {
      return ErrNonceTooLow
   }
   // 發起交易的一方應該有足夠的資金來支付費用,判斷餘額是否足夠
   // cost == V + GP * GL
   if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
      return ErrInsufficientFunds
   }
   intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
   if err != nil {
      return err
   }
   if tx.Gas() < intrGas {
      return ErrIntrinsicGas
   }
   return nil
}
複製代碼

loop() 方法

是txPool的一個goroutine.也是主要的事件循環.等待和響應外部區塊鏈事件以及各類報告和交易驅逐事件

func (pool *TxPool) loop() {
    //等待組計數器減一
   defer pool.wg.Done()

   // 啓動統計報表和交易退出提示符
   var prevPending, prevQueued, prevStales int
	
   report := time.NewTicker(statsReportInterval)
   defer report.Stop()

   evict := time.NewTicker(evictionInterval)
   defer evict.Stop()

   journal := time.NewTicker(pool.config.Rejournal)
   defer journal.Stop()

   // 跟蹤交易重組的前一個頭標頭
   head := pool.chain.CurrentBlock()

   // 不斷等待和應對各類事件
   for {
      select {
      // 處理鏈頭事件
      case ev := <-pool.chainHeadCh:
         if ev.Block != nil {
            pool.mu.Lock()
            if pool.chainconfig.IsHomestead(ev.Block.Number()) {
               pool.homestead = true
            }
            pool.reset(head.Header(), ev.Block.Header())
            head = ev.Block

            pool.mu.Unlock()
         }
      //因爲系統中止而取消訂閱
      case <-pool.chainHeadSub.Err():
         return

      // 處理統計報表刻度
      case <-report.C:
         pool.mu.RLock()
         pending, queued := pool.stats()
         stales := pool.priced.stales
         pool.mu.RUnlock()

         if pending != prevPending || queued != prevQueued || stales != prevStales {
            log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
            prevPending, prevQueued, prevStales = pending, queued, stales
         }

      // 處理非活動賬戶事務退出
      case <-evict.C:
         pool.mu.Lock()
         for addr := range pool.queue {
            // 從退出機制中跳過本地事務
            if pool.locals.contains(addr) {
               continue
            }
            // 任什麼時候間足夠長的非本地交易信息都應該被清除
            if time.Since(pool.beats[addr]) > pool.config.Lifetime {
               for _, tx := range pool.queue[addr].Flatten() {
                  pool.removeTx(tx.Hash(), true)
               }
            }
         }
         pool.mu.Unlock()

      // 處理本地交易日誌的輪換
      case <-journal.C:
         if pool.journal != nil {
            pool.mu.Lock()
            if err := pool.journal.rotate(pool.local()); err != nil {
               log.Warn("Failed to rotate local tx journal", "err", err)
            }
            pool.mu.Unlock()
         }
      }
   }
}
複製代碼

promoteTx() 方法

把某個交易加入到pending 隊列. 這個方法假設已經獲取到了鎖

func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
   // 嘗試將交易插入到等待掛起隊列中
   if pool.pending[addr] == nil {
      pool.pending[addr] = newTxList(true)
   }
   list := pool.pending[addr]

   inserted, old := list.Add(tx, pool.config.PriceBump)
   if !inserted {
      // 若是舊的交易更好就丟棄這個交易
      pool.all.Remove(hash)
      pool.priced.Removed()

      pendingDiscardCounter.Inc(1)
      return false
   }
   // 若是新的交易更好就丟棄之前的任何事務並標記此事務
   if old != nil {
      pool.all.Remove(old.Hash())
      pool.priced.Removed()

      pendingReplaceCounter.Inc(1)
   }
   // Failsafe to work around direct pending inserts (tests)
   if pool.all.Get(hash) == nil {
      pool.all.Add(tx)
      pool.priced.Put(tx)
   }
   // Set the potentially new pending nonce and notify any subsystems of the new tx
   pool.beats[addr] = time.Now()
   pool.pendingState.SetNonce(addr, tx.Nonce()+1)

   return true
}
複製代碼
相關文章
相關標籤/搜索