在使用Fabric建立通道的時候,一般咱們執行一條命令完成,這篇文章就解析一下執行這條命令後Fabric源碼中執行的流程。html
peer channel create -o orderer.example.com:7050 -c mychannel -f ./channel-artifacts/channel.tx --tls true --cafile $ORDERER_CA
整個流程的切入點在fabric/peer/main.go
文件中的main()
方法 (本文中使用的是Fabric1.4版本,不一樣版本中內容可能不一樣)。這個方法中也定義了Peer節點能夠執行的命令,有關於版本的:version.Cmd()
,關於節點狀態的:node.Cmd()
,關於鏈碼的:chaincode.Cmd(nil)
,關於客戶端日誌的:clilogging.Cmd(nil)
,最後一個就是關於通道的:channel.Cmd(nil)
。因此咱們就從這裏入手,看一下建立通道的總體流程是什麼樣的。
點進行後,轉到了peer/channel/channel.go
文件中第49行,其中定義了Peer節點能夠執行的對通道進行操做的相關命令:node
func Cmd(cf *ChannelCmdFactory) *cobra.Command { AddFlags(channelCmd) #建立通道 channelCmd.AddCommand(createCmd(cf)) #從通道獲取區塊 channelCmd.AddCommand(fetchCmd(cf)) #加入通道 channelCmd.AddCommand(joinCmd(cf)) #列出當前節點所加入的通道 channelCmd.AddCommand(listCmd(cf)) #簽名並更新通道配置信息 channelCmd.AddCommand(updateCmd(cf)) #只對通道配置信息進行簽名 channelCmd.AddCommand(signconfigtxCmd(cf)) #獲取通道信息 channelCmd.AddCommand(getinfoCmd(cf)) return channelCmd }
具體的Peer節點命令使用方法能夠參考Fabric官方文檔,這裏不在一一解釋。
咱們看一下createCmd(cf)
方法,該方法轉到了peer/channel/create.go
文件中第51行,看文件名字就知道和建立通道相關了。git
func createCmd(cf *ChannelCmdFactory) *cobra.Command { createCmd := &cobra.Command{ Use: "create", #使用create關鍵詞建立通道 Short: "Create a channel", Long: "Create a channel and write the genesis block to a file.", #建立通道並將創世區塊寫入文件 RunE: func(cmd *cobra.Command, args []string) error { #這一行命令就是對通道進行建立了,點進行看一下 return create(cmd, args, cf) }, } ... }
create(cmd, args, cf)
方法在本文件中第227行:github
func create(cmd *cobra.Command, args []string, cf *ChannelCmdFactory) error { // the global chainID filled by the "-c" command #官方註釋用-c來代表通道ID if channelID == common.UndefinedParamValue { #UndefinedParamValue="",若是通道ID等於空 return errors.New("must supply channel ID") } // Parsing of the command line is done so silence cmd usage cmd.SilenceUsage = true var err error if cf == nil { #若是ChannelCmdFactory爲空,則初始化一個 cf, err = InitCmdFactory(EndorserNotRequired, PeerDeliverNotRequired, OrdererRequired) if err != nil { return err } } #最後將ChannelCmdFactory傳入該方法,進行通道的建立 return executeCreate(cf) }
首先看一下InitCmdFactory()作了哪些工做,在peer/channel/channel.go
文件中第126行:json
func InitCmdFactory(isEndorserRequired, isPeerDeliverRequired, isOrdererRequired bool) (*ChannelCmdFactory, error) { #這裏的意思就是隻能有一個交付源,要麼是Peer要麼是Orderer if isPeerDeliverRequired && isOrdererRequired { return nil, errors.New("ERROR - only a single deliver source is currently supported") } var err error #初始化ChannelCmdFactory,看一下該結構體的內容 cf := &ChannelCmdFactory{}
直接拿到這裏來好了:網絡
type ChannelCmdFactory struct { #用於背書的客戶端 EndorserClient pb.EndorserClient #簽名者 Signer msp.SigningIdentity #用於廣播的客戶端 BroadcastClient common.BroadcastClient #用於交付的客戶端 DeliverClient deliverClientIntf #建立用於廣播的客戶端的工廠 BroadcastFactory BroadcastClientFactory }
再往下看:app
#獲取默認的簽名者,一般是Peer節點 cf.Signer, err = common.GetDefaultSignerFnc() if err != nil { return nil, errors.WithMessage(err, "error getting default signer") } cf.BroadcastFactory = func() (common.BroadcastClient, error) { #根據ChannelCmdFactory結構體中的BroadcastFactory獲取BroadcastClient return common.GetBroadcastClientFnc() } // for join and list, we need the endorser as well #咱們這裏是完成對通道的建立,因此只使用了最後一個isOrdererRequired if isEndorserRequired { #建立一個用於背書的客戶端 cf.EndorserClient, err = common.GetEndorserClientFnc(common.UndefinedParamValue, common.UndefinedParamValue) if err != nil { return nil, errors.WithMessage(err, "error getting endorser client for channel") } } // for fetching blocks from a peer if isPeerDeliverRequired { #從Peer節點建立一個用於交付的客戶端 cf.DeliverClient, err = common.NewDeliverClientForPeer(channelID, bestEffort) if err != nil { return nil, errors.WithMessage(err, "error getting deliver client for channel") } } // for create and fetch, we need the orderer as well if isOrdererRequired { if len(strings.Split(common.OrderingEndpoint, ":")) != 2 { return nil, errors.Errorf("ordering service endpoint %s is not valid or missing", common.OrderingEndpoint) } #從Order節點建立一個一個用於交付的客戶端 cf.DeliverClient, err = common.NewDeliverClientForOrderer(channelID, bestEffort) if err != nil { return nil, err } } logger.Infof("Endorser and orderer connections initialized") return cf, nil }
返回create()
方法:fetch
#到了最後一行代碼,傳入以前建立的ChannelCmdFactory,開始進行通道的建立 return executeCreate(cf)
該方法在peer/channel/create.go
文件中的第174行:ui
#方法比較清晰,一共完成了如下幾個步驟 func executeCreate(cf *ChannelCmdFactory) error { #發送建立通道的Transaction到Order節點 err := sendCreateChainTransaction(cf) if err != nil { return err } #獲取該通道內的創世區塊(該過程在Order節點共識完成以後) block, err := getGenesisBlock(cf) if err != nil { return err } #序列化區塊信息 b, err := proto.Marshal(block) if err != nil { return err } file := channelID + ".block" if outputBlock != common.UndefinedParamValue { file = outputBlock } #將區塊信息寫入本地文件中 err = ioutil.WriteFile(file, b, 0644) if err != nil { return err } return nil }
首先咱們看一下sendCreateChainTransaction()
這個方法,又回到了peer/channel/create.go
文件中,在第144行:this
func sendCreateChainTransaction(cf *ChannelCmdFactory) error { var err error #定義了一個Envelope結構體 var chCrtEnv *cb.Envelope
Envelope
結構體:
type Envelope struct { #主要就是保存被序列化的有效載荷 Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` #由建立者進行的簽名信息 Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
回到sendCreateChainTransaction()
這個方法,繼續往下:
if channelTxFile != "" { #若是指定了channelTxFile,則使用指定的文件建立通道,這個方法很簡單,從文件中讀取數據,反序列化後返回chCrtEnv.對於咱們啓動Fabric網絡以前曾建立過一個channel.tx文件,指的就是這個 if chCrtEnv, err = createChannelFromConfigTx(channelTxFile); err != nil { return err } } else { #若是沒有指定,則使用默認的配置建立通道,看一下這個方法,在71行 if chCrtEnv, err = createChannelFromDefaults(cf); err != nil { return err } } -------------------------------createChannelFromDefaults()------- func createChannelFromDefaults(cf *ChannelCmdFactory) (*cb.Envelope, error) { #主要就這一個方法,點進去 chCrtEnv, err := encoder.MakeChannelCreationTransaction(channelID, localsigner.NewSigner(), genesisconfig.Load(genesisconfig.SampleSingleMSPChannelProfile)) if err != nil { return nil, err } return chCrtEnv, nil }
MakeChannelCreationTransaction()
方法傳入了通道的ID,並建立了一個簽名者,以及默認的配置文件,方法在common/tools/configtxgen/encoder/encoder.go
文件中第502行:
func MakeChannelCreationTransaction(channelID string, signer crypto.LocalSigner, conf *genesisconfig.Profile) (*cb.Envelope, error) { #從名字能夠看到是使用了默認的配置模板,對各類策略進行配置,裏面就再也不細看了 template, err := DefaultConfigTemplate(conf) if err != nil { return nil, errors.WithMessage(err, "could not generate default config template") } #看一下這個方法,從模板中建立一個用於建立通道的Transaction return MakeChannelCreationTransactionFromTemplate(channelID, signer, conf, template) }
MakeChannelCreationTransactionFromTemplate()
方法在第530行:
func MakeChannelCreationTransactionFromTemplate(channelID string, signer crypto.LocalSigner, conf *genesisconfig.Profile, template *cb.ConfigGroup) (*cb.Envelope, error) { newChannelConfigUpdate, err := NewChannelCreateConfigUpdate(channelID, conf, template) ... #建立一個用於配置更新的結構體 newConfigUpdateEnv := &cb.ConfigUpdateEnvelope{ ConfigUpdate: utils.MarshalOrPanic(newChannelConfigUpdate), } if signer != nil { #若是簽名者不爲空,建立簽名Header sigHeader, err := signer.NewSignatureHeader() ... newConfigUpdateEnv.Signatures = []*cb.ConfigSignature{{ SignatureHeader: utils.MarshalOrPanic(sigHeader), }} ... #進行簽名 newConfigUpdateEnv.Signatures[0].Signature, err = signer.Sign(util.ConcatenateBytes(newConfigUpdateEnv.Signatures[0].SignatureHeader, newConfigUpdateEnv.ConfigUpdate)) ... } #建立被簽名的Envelope,而後一直返回到最外面的方法 return utils.CreateSignedEnvelope(cb.HeaderType_CONFIG_UPDATE, channelID, signer, newConfigUpdateEnv, msgVersion, epoch) }
到這裏,用於建立通道的Envelope已經建立好了,sendCreateChainTransaction()
繼續往下看:
... #該方法主要是對剛剛建立的Envelope進行驗證 if chCrtEnv, err = sanityCheckAndSignConfigTx(chCrtEnv); err != nil { return err } var broadcastClient common.BroadcastClient #驗證完成後,建立一個用於廣播信息的客戶端 broadcastClient, err = cf.BroadcastFactory() if err != nil { return errors.WithMessage(err, "error getting broadcast client") } defer broadcastClient.Close() #將建立通道的Envelope信息廣播出去 err = broadcastClient.Send(chCrtEnv) return err }
到這裏,sendCreateChainTransaction()
方法結束了,總結一下該方法所作的工做:
至於獲取創世區塊以及將文件保存到本地再也不說明,接下來咱們看一下Peer節點將建立通道的Envelope廣播出去後,Order節點作了什麼。
方法在/order/common/server/server.go
中第141行:
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { ... #主要在這一行代碼,Handle方法對接收到的信息進行處理 return s.bh.Handle(&broadcastMsgTracer{ AtomicBroadcast_BroadcastServer: srv, msgTracer: msgTracer{ debug: s.debug, function: "Broadcast", }, }) }
對於Handler()
方法,在/order/common/broadcast/broadcast.go
文件中第66行:
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { #首先獲取消息的源地址 addr := util.ExtractRemoteAddress(srv.Context()) ... for { #接收消息 msg, err := srv.Recv() ... #處理接收到的消息,咱們看一下這個方法 resp := bh.ProcessMessage(msg, addr) #最後將響應信息廣播出去 err = srv.Send(resp) ... } }
ProcessMessage(msg, addr)
方法的傳入參數爲接收到的消息以及消息的源地址,該方法比較重要,是Order節點對消息進行處理的主方法。在第136行:
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) { #這個結構體應該理解爲記錄器,記錄消息的相關信息 tracker := &MetricsTracker{ ChannelID: "unknown", TxType: "unknown", Metrics: bh.Metrics, } defer func() { // This looks a little unnecessary, but if done directly as // a defer, resp gets the (always nil) current state of resp // and not the return value tracker.Record(resp) }() #記錄處理消息的開始時間 tracker.BeginValidate() #該方法獲取接收到的消息的Header,並判斷是否爲配置信息 chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg) ... #因爲以前Peer節點發送的爲建立通道的信息,因此消息類型爲配置信息 if !isConfig { ... #而對於普通的交易信息的處理方法這裏就再也不看了,主要關注於建立通道的消息的處理 } else { // isConfig logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr) #到了這裏,對配置更新消息進行處理,主要方法,點進行看一下 config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
ProcessConfigUpdateMsg(msg)
方法在orderer/common/msgprocessor/systemchannel.go
文件中第84行:
#這個地方有些不懂,爲何會調用systemchannel.ProcessConfigUpdateMsg()而不是standardchannel.ProcessConfigUpdateMsg()方法?是由於這個結構體的緣由? ===========================SystemChannel======================= type SystemChannel struct { *StandardChannel templator ChannelConfigTemplator } ===========================SystemChannel======================= func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) { #首先從消息體中獲取通道ID channelID, err := utils.ChannelID(envConfigUpdate) ... #判斷獲取到的通道ID是否爲已經存在的用戶通道ID,若是是的話轉到StandardChannel中的ProcessConfigUpdateMsg()方法進行處理 if channelID == s.support.ChainID() { return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate) } ... #因爲以前由Peer節點發送的爲建立通道的Tx,因此對於通道ID是不存在的,所以到了這個方法,點進行看一下 bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
NewChannelConfig()
方法在第215行,比較重要的方法,完成通道的配置:
func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) { #首先反序列化有效載荷 configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload) ... #反序列化配置更新信息Envelope configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)s ... #獲取通道頭信息 channelHeader, err := utils.UnmarshalChannelHeader(configUpdatePayload.Header.ChannelHeader) ... #反序列化配置更新信息 configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate) ... #如下具體的再也不說了,就是根據以前定義的各項策略對通道進行配置,具體的策略能夠看configtx.yaml文件 consortiumConfigValue, ok := configUpdate.WriteSet.Values[channelconfig.ConsortiumKey] ... consortium := &cb.Consortium{} err = proto.Unmarshal(consortiumConfigValue.Value, consortium) ... applicationGroup := cb.NewConfigGroup() consortiumsConfig, ok := dt.support.ConsortiumsConfig() ... consortiumConf, ok := consortiumsConfig.Consortiums()[consortium.Name] ... applicationGroup.Policies[channelconfig.ChannelCreationPolicyKey] = &cb.ConfigPolicy{ Policy: consortiumConf.ChannelCreationPolicy(), } applicationGroup.ModPolicy = channelconfig.ChannelCreationPolicyKey #獲取當前系統通道配置信息 systemChannelGroup := dt.support.ConfigtxValidator().ConfigProto().ChannelGroup if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 && len(configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups) == 0 { return nil, fmt.Errorf("Proposed configuration has no application group members, but consortium contains members") } if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 { for orgName := range configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups { consortiumGroup, ok := systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups[orgName] if !ok { return nil, fmt.Errorf("Attempted to include a member which is not in the consortium") } applicationGroup.Groups[orgName] = proto.Clone(consortiumGroup).(*cb.ConfigGroup) } } channelGroup := cb.NewConfigGroup() #將系統通道配置信息複製 for key, value := range systemChannelGroup.Values { channelGroup.Values[key] = proto.Clone(value).(*cb.ConfigValue) if key == channelconfig.ConsortiumKey { // Do not set the consortium name, we do this later continue } } for key, policy := range systemChannelGroup.Policies { channelGroup.Policies[key] = proto.Clone(policy).(*cb.ConfigPolicy) } #新的配置信息中Order組配置使用系統通道的配置,同時將定義的application組配置賦值到新的配置信息 channelGroup.Groups[channelconfig.OrdererGroupKey] = proto.Clone(systemChannelGroup.Groups[channelconfig.OrdererGroupKey]).(*cb.ConfigGroup) channelGroup.Groups[channelconfig.ApplicationGroupKey] = applicationGroup channelGroup.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{ Value: utils.MarshalOrPanic(channelconfig.ConsortiumValue(consortium.Name).Value()), ModPolicy: channelconfig.AdminsPolicyKey, } if oc, ok := dt.support.OrdererConfig(); ok && oc.Capabilities().PredictableChannelTemplate() { channelGroup.ModPolicy = systemChannelGroup.ModPolicy zeroVersions(channelGroup) } #將建立的新的配置打包爲Bundle bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{ ChannelGroup: channelGroup, }) ... return bundle, nil }
接下來咱們回到ProcessConfigUpdateMsg()
方法:
... #建立一個配置驗證器對該方法的傳入參數進行驗證操做 newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate) ... #建立一個簽名的Envelope,這次爲Header類型爲HeaderType_CONFIG進行簽名 newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch) #建立一個簽名的Transaction,這次爲Header類型爲HeaderType_ORDERER_TRANSACTION進行簽名 wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch) ... #過濾器進行過濾,主要檢查是否建立的Transaction過大,以及簽名檢查,確保Order節點使用正確的證書進行簽名 err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction) ... #將Transaction返回 return wrappedOrdererTransaction, s.support.Sequence(), nil }
到這裏,消息處理完畢,返回到ProcessMessage()
方法:
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) ... #記錄消息處理完畢時間 tracker.EndValidate() #開始進行入隊操做 tracker.BeginEnqueue() #waitReady()是一個阻塞方法,等待入隊完成或出現異常 if err = processor.WaitReady(); err != nil { logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err) return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()} } #共識方法,具體看定義的Fabric網絡使用了哪一種共識 err = processor.Configure(config, configSeq) ... #最後返回操做成功的響應 return &ab.BroadcastResponse{Status: cb.Status_SUCCESS} }
到這裏,由客戶端發送的建立通道的Transaction
就結束了。總共分爲兩個部分,一個是Peer
節點對建立通道的Envelope
進行建立的過程,一個是Order
節點接收到該Envelope
進行配置的過程,最後總結一下總體流程:
Peer
節點方:
Transaction
channel.tx
文件,若是有的話直接從文件中讀取已配置好的信息,通常都會存在Envelope
進行相關檢查包括各項數據是否爲空,建立的通道是否已經存在,配置信息是否正確,以及進行簽名,封裝爲HeaderType
爲CONFIG_UPDATE
的Envelope
。Envelope
廣播出去。Order
節點方:
Envelope
信息,進行相關驗證並判斷是否爲配置信息Envelope
中讀取各項策略配置Header
類型爲CONFIG
的Envelope進行簽名Header
類型爲ORDERER_TRANSACTION
的Envelope
進行簽名生成Transaction
Transaction
進行過濾,主要是Tx大小,Order
節點的證書信息是否正確整個建立通道的過程也是比較長的,能力有限,因此有些地方並無分析太清晰。不過總體仍是能夠把握住的。
最後附上參考文檔:傳送門
以及Fabric源碼地址:傳送門