orderer提供broadcast和deliver兩個服務接口。orderer節點與各個peer節點經過grpc鏈接,orderer將全部peer節點經過broadcast發來的交易(Envelope格式,好比peer部署後的數據)按照配置的大小依次封裝到一個個block中並寫入orderer本身的帳本中,而後供各個peer節點的gossip服務經過deliver來消費這個帳本中的數據進行自身結點帳本的同步。node
先看看main函數。bootstrap
// Main is the entry point of orderer process func Main() { fullCmd := kingpin.MustParse(app.Parse(os.Args[1:])) // "version" command if fullCmd == version.FullCommand() { fmt.Println(metadata.GetVersionInfo()) return } conf, err := localconfig.Load() if err != nil { logger.Error("failed to parse config: ", err) os.Exit(1) } initializeLogging() initializeLocalMsp(conf) prettyPrintStruct(conf) Start(fullCmd, conf) }
從中可知 orderer服務命令行是經過kingpin來實現的,基本上只是簡單使用了下,也只實現了3個命令:緩存
start* Start the orderer node version Show version information benchmark Run orderer in benchmark mode
而且從上述main函數可知,僅version有對應操做,而orderer 默認爲orderer start。app
啓動流程爲:ide
接下來主要關注第4步。前面基本上是配置初始化第過程。
查看一下start函數:函數
if clusterType && conf.General.GenesisMethod == "file" { r.replicateIfNeeded(bootstrapBlock) } func (r *Replicator) ReplicateChains() []string { var replicatedChains []string channels := r.discoverChannels() pullHints := r.channelsToPull(channels) totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull) for _, channels := range [][]ChannelGenesisBlock{pullHints.channelsToPull, pullHints.channelsNotToPull} { for _, channel := range channels { ... r.appendBlock(gb, ledger, channel.ChannelName) } } for _, channel := range pullHints.channelsToPull { err := r.PullChannel(channel.ChannelName) ... } // Last, pull the system chain. if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped { r.Logger.Panicf("Failed pulling system channel: %v", err) } return replicatedChains }
// Are we bootstrapping? if len(lf.ChainIDs()) == 0 { initializeBootstrapChannel(genesisBlock, lf) } else { logger.Info("Not bootstrapping because of existing chains") }
registrar := multichannel.NewRegistrar(lf, signer, metricsProvider, callbacks...)
// Registrar serves as a point of access and control for the individual channel resources. type Registrar struct { lock sync.RWMutex //當前全部通道的chain對象 chains map[string]*ChainSupport //不一樣共識類型的consenter consenters map[string]consensus.Consenter //Factory經過chainID檢索或建立新的分類賬 ledgerFactory blockledger.Factory //簽名相關 signer crypto.LocalSigner blockcutterMetrics *blockcutter.Metrics //系統鏈id systemChannelID string //系統鏈chainSupport systemChannel *ChainSupport //通道配置模版 templator msgprocessor.ChannelConfigTemplator callbacks []channelconfig.BundleActor }
consenters["solo"] = solo.New() var kafkaMetrics *kafka.Metrics consenters["kafka"], kafkaMetrics = kafka.New(conf, metricsProvider, healthChecker, registrar) go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil) if isClusterType(bootstrapBlock) { initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider) }
chain := newChainSupport( r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, )
for _, chainID := range existingChains { ... chain.start() ... }
type AtomicBroadcastServer interface { Broadcast(AtomicBroadcast_BroadcastServer) error Deliver(AtomicBroadcast_DeliverServer) error }其接口的實如今:orderer/common/server/server.go
提供支持chain相關操做的資源,既包含帳本自己,也包含了帳本用到的各類工具對象,如分割工具cutter,簽名工具signer,最新配置在chain中的位置信息(lastConfig的值表明當前鏈中最新配置所在的block的編號,lastConfigSeq的值則表明當前鏈中最新配置消息自身的編號)等工具
type ChainSupport struct { // 帳本相關資源 包括帳本的讀寫及配置的獲取 *ledgerResources // 提供從客戶端獲取消息分類處理接口 msgprocessor.Processor // 將區塊寫入磁盤 *BlockWriter // 鏈 提供對messages對處理方法 //This design allows for two primary flows // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka) // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft) consensus.Chain // 廣播消息接收器 提供切塊方法 cutter blockcutter.Receiver //簽名 crypto.LocalSigner // chains need to know if they are system or standard channel. systemChannel bool }
MaxMessageCount指定了block中最多存儲的消息數量 AbsoluteMaxBytes指定了block最大的字節數 PreferredMaxBytes指定了一條消息的最優的最大字節數(blockcutter處理消息的過程當中會努力使每一批消息儘可能保持在這個值上)。
主要是對消息進行處理,將交易消息傳輸給block cutter切成塊及寫入帳本。不一樣的共識機制操做不一樣。(後續結合consenter模塊一塊兒詳細介紹).net
chain := newChainSupport( r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, ) r.chains[chainID] = chain chain.start()
solo/kafka/etcdraft三種共識類型,用於序列化生產(即各個peer點傳送來的Envelope)出來的消息。命令行
參考:
https://blog.csdn.net/idsuf698987/article/details/78639203code