菜鳥系列Fabric源碼學習 — orderer服務啓動

Fabric 1.4 orderer 服務啓動流程

1.提要

orderer提供broadcast和deliver兩個服務接口。orderer節點與各個peer節點經過grpc鏈接,orderer將全部peer節點經過broadcast發來的交易(Envelope格式,好比peer部署後的數據)按照配置的大小依次封裝到一個個block中並寫入orderer本身的帳本中,而後供各個peer節點的gossip服務經過deliver來消費這個帳本中的數據進行自身結點帳本的同步。node

2.初始化過程

先看看main函數。bootstrap

// Main is the entry point of orderer process
func Main() {
    fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

    // "version" command
    if fullCmd == version.FullCommand() {
        fmt.Println(metadata.GetVersionInfo())
        return
    }

    conf, err := localconfig.Load()
    if err != nil {
        logger.Error("failed to parse config: ", err)
        os.Exit(1)
    }
    initializeLogging()
    initializeLocalMsp(conf)

    prettyPrintStruct(conf)
    Start(fullCmd, conf)
}

從中可知 orderer服務命令行是經過kingpin來實現的,基本上只是簡單使用了下,也只實現了3個命令:緩存

start*
    Start the orderer node
  version
    Show version information
  benchmark
    Run orderer in benchmark mode

而且從上述main函數可知,僅version有對應操做,而orderer 默認爲orderer start。app

啓動流程爲:ide

  1. 加載配置(orderer.yaml/Defaults/環境變量)
  2. 初始化log(log級別和log格式)
  3. 初始化本地msp
  4. 啓動服務Start()

接下來主要關注第4步。前面基本上是配置初始化第過程。
查看一下start函數:函數

  1. 從配置文件啓動塊路徑獲取配置塊及驗證是否可做爲配置塊(系統通道第一個塊)
  2. 集羣相關初始化配置
  3. 判斷是不是raft共識及使用的是最新配置塊,若是是,則進行下列流程:
    1. 獲取全部應用鏈及其創世區塊塊(discoverChannels)
    2. 根據orderer是否在應用鏈配置塊的raft節點中分類(channelsToPull topull/nottopull)
    3. 建立全部的應用通道帳本
    4. 獲取topull應用通道的帳本(從orderer處獲取)
    5. 獲取系統通道帳本
if clusterType && conf.General.GenesisMethod == "file" {
        r.replicateIfNeeded(bootstrapBlock)
    }
    
func (r *Replicator) ReplicateChains() []string {
    var replicatedChains []string
    channels := r.discoverChannels()
    pullHints := r.channelsToPull(channels)
    totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull)

    for _, channels := range [][]ChannelGenesisBlock{pullHints.channelsToPull, pullHints.channelsNotToPull} {
        for _, channel := range channels {
            ...
            r.appendBlock(gb, ledger, channel.ChannelName)
        }
    }

    for _, channel := range pullHints.channelsToPull {
        err := r.PullChannel(channel.ChannelName)
        ...
    }

    // Last, pull the system chain.
    if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped {
        r.Logger.Panicf("Failed pulling system channel: %v", err)
    }
    return replicatedChains
}
  1. 啓動及初始化必要模塊
    1. 建立系統鏈
    // Are we bootstrapping?
    if len(lf.ChainIDs()) == 0 {
        initializeBootstrapChannel(genesisBlock, lf)
    } else {
        logger.Info("Not bootstrapping because of existing chains")
    }
    1. 多通道初始化(initializeMultichannelRegistrar)
      • 初始化registrar實例
      registrar := multichannel.NewRegistrar(lf, signer, metricsProvider, callbacks...)
      // Registrar serves as a point of access and control for the individual channel resources.
      type Registrar struct {
          lock   sync.RWMutex
          //當前全部通道的chain對象
          chains map[string]*ChainSupport
          //不一樣共識類型的consenter
          consenters         map[string]consensus.Consenter
          //Factory經過chainID檢索或建立新的分類賬
          ledgerFactory      blockledger.Factory
          //簽名相關
          signer             crypto.LocalSigner
          blockcutterMetrics *blockcutter.Metrics
          //系統鏈id
          systemChannelID    string
          //系統鏈chainSupport
          systemChannel      *ChainSupport
          //通道配置模版
          templator          msgprocessor.ChannelConfigTemplator
          callbacks          []channelconfig.BundleActor
      }
      • 初始化共識機制
      consenters["solo"] = solo.New()
      var kafkaMetrics *kafka.Metrics
      consenters["kafka"], kafkaMetrics = kafka.New(conf, metricsProvider, healthChecker, registrar)
      go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
      if isClusterType(bootstrapBlock) {
          initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
      }
      • 啓動orderer現存的鏈(系統鏈/應用鏈,經過讀取鏈的目錄查看現存鏈),爲每條鏈實例化了ChainSupport對象,而後啓動
      chain := newChainSupport(
             r,
             ledgerResources,
             r.consenters,
             r.signer,
             r.blockcutterMetrics,
         )
      for _, chainID := range existingChains {
              ...
              chain.start()
              ...
      }
    2. 啓動GRPC服務
      server.go中的服務端對象實例server在main.go的main()中由server := NewServer(manager, signer)生成,使用ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)進行了註冊,隨後grpcServer.Start()啓動起來。
      其主要的兩個接口爲:
    type AtomicBroadcastServer interface {
        Broadcast(AtomicBroadcast_BroadcastServer) error
        Deliver(AtomicBroadcast_DeliverServer) error
    }
    其接口的實如今:orderer/common/server/server.go

3.相關模塊介紹

3.1 ChainSupport

提供支持chain相關操做的資源,既包含帳本自己,也包含了帳本用到的各類工具對象,如分割工具cutter,簽名工具signer,最新配置在chain中的位置信息(lastConfig的值表明當前鏈中最新配置所在的block的編號,lastConfigSeq的值則表明當前鏈中最新配置消息自身的編號)等工具

type ChainSupport struct {
    // 帳本相關資源 包括帳本的讀寫及配置的獲取
    *ledgerResources
    // 提供從客戶端獲取消息分類處理接口
    msgprocessor.Processor
    // 將區塊寫入磁盤
    *BlockWriter
    // 鏈 提供對messages對處理方法
    //This design allows for two primary flows
    // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
    // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
    consensus.Chain
    // 廣播消息接收器 提供切塊方法   
    cutter blockcutter.Receiver
    //簽名
    crypto.LocalSigner
    // chains need to know if they are system or standard channel.
    systemChannel bool
}

3.2 blockcutter模塊

  • 塊分割工具,用於分割block,具體爲orderer/common/blockcutter/blockcutter.go中定義的receiver。一條一條消息數據在流水線上被傳送到cutter處,cutter按照configtx.yaml中的配置,把一條條消息打包成一批消息,同時返回整理好的這批消息對應的committer集合,至此cutter的任務完成。
MaxMessageCount指定了block中最多存儲的消息數量
AbsoluteMaxBytes指定了block最大的字節數
PreferredMaxBytes指定了一條消息的最優的最大字節數(blockcutter處理消息的過程當中會努力使每一批消息儘可能保持在這個值上)。
  1. 若一個Envelope的數據大小(Payload+簽名)大於PreferredMaxBytes時,不管當前緩存如何,當即Cut;
  2. 若一個Envelope被要求單純存儲在一個block(即該消息對應的committer的Isolated()返回爲true),要當即Cut
  3. 若一個Envelope的大小加上blockcutter已有的消息的大小之和大於PreferredMaxBytes時,要當即Cut;
  4. 若blockcutter當前緩存的消息數量大於MaxMessageCount了,要當即Cut。
  5. 由configtx.yaml中BatchTimeout配置項(默認2s)控制,當時間超過此值,chain啓動的處理消息的流程中主動觸發的Cut。

3.3 chain start()模塊

主要是對消息進行處理,將交易消息傳輸給block cutter切成塊及寫入帳本。不一樣的共識機制操做不一樣。(後續結合consenter模塊一塊兒詳細介紹).net

chain := newChainSupport(
                r,
                ledgerResources,
                r.consenters,
                r.signer,
                r.blockcutterMetrics,
            )
r.chains[chainID] = chain
chain.start()

3.4 consenter模塊:

solo/kafka/etcdraft三種共識類型,用於序列化生產(即各個peer點傳送來的Envelope)出來的消息。命令行

參考:
https://blog.csdn.net/idsuf698987/article/details/78639203code

相關文章
相關標籤/搜索