看一下Peer
節點的啓動過程,一般在Fabric網絡中,Peer
節點的啓動方式有兩種,經過Docker容器啓動,或者是經過執行命令直接啓動。
通常狀況下,咱們都是執行docker-compose -f docker-*.yaml up
命令經過容器啓動了Peer
節點,而若是直接啓動Peer
節點則是執行了peer node start
這條命令。看起來,這兩種方式所使用的命令毫無關係,但事實上,在Docker容器中啓動Peer
節點也是經過執行了peer node start
這條命令來啓動Peer
節點,只不過是Docker替咱們執行了,這條命令就在以前經過啓動Docker容器的那個文件中寫到。因此說,不管是哪一種方式啓動Peer
節點,都是經過peer node start
這條命令,接下來,咱們就分析一下執行完這條命令後,Peer
節點的啓動過程。
和以前同樣,首先找到切入點,在/fabric/peer/main.go
文件中,第46行:java
mainCmd.AddCommand(node.Cmd())
這裏包含了與對Peer
節點進行相關操做的命令集合,其中就有啓動Peer
節點的命令,咱們點進行看一下:node
func Cmd() *cobra.Command { nodeCmd.AddCommand(startCmd()) nodeCmd.AddCommand(statusCmd()) return nodeCmd }
共有兩條命令:啓動Peer
節點,以及查看節點的狀態,咱們看一下啓動Peer
節點這條命令,首先調用了peer/node/start.go
文件中的startCmd()
,以後轉到了nodeStartCmd
,以及serve(args)
這個方法。其中,serve(args)
這個方法就是本文要說明了主要方法,咱們就從這裏開始分析,在peer/node/start.go
文件中第125行:git
func serve(args []string) error { #首先獲取MSP的類型,msp指的是成員關係服務提供者,至關於許可證 mspType := mgmt.GetLocalMSP().GetType() #若是MSP的類型不是FABRIC,返回錯誤信息 if mspType != msp.FABRIC { panic("Unsupported msp type " + msp.ProviderTypeToString(mspType)) } ... #建立ACL提供者,access control list訪問控制列表 aclProvider := aclmgmt.NewACLProvider( aclmgmt.ResourceGetter(peer.GetStableChannelConfig), ) #平臺註冊,可使用的語言類型,最後一個car不太理解,可能和官方的一個例子有關 pr := platforms.NewRegistry( &golang.Platform{}, &node.Platform{}, &java.Platform{}, &car.Platform{}, )
定義一個用於部署鏈碼的Provider結構體:github
deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{} ==========================DeployedCCInfoProvider========================== type DeployedChaincodeInfoProvider interface { Namespaces() []string #命名空間 UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) #保存更新的鏈碼 ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) #保存鏈碼信息 CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) } #保存鏈碼數據信息 ==========================DeployedCCInfoProvider==========================
下面是對Peer節點的一些屬性的設置了:golang
identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer { #獲取通道管理者 return mgmt.GetManagerForChain(chainID) } #至關於配置Peer節點的運行環境了,主要就是保存Peer節點的IP地址,端口,證書等相關基本信息 opsSystem := newOperationsSystem() err := opsSystem.Start() if err != nil { return errors.WithMessage(err, "failed to initialize operations subystems") } defer opsSystem.Stop() metricsProvider := opsSystem.Provider #建立觀察者,對Peer節點進行記錄 logObserver := floggingmetrics.NewObserver(metricsProvider) flogging.Global.SetObserver(logObserver) #建立成員關係信息Provider,簡單來講就是保存其餘Peer節點的信息,以便通訊等等 membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory) #帳本管理器初始化,主要就是以前所定義的一些屬性 ledgermgmt.Initialize( &ledgermgmt.Initializer{ #與Tx處理相關 CustomTxProcessors: peer.ConfigTxProcessors, #以前定義的所使用的語言 PlatformRegistry: pr, #與鏈碼相關 DeployedChaincodeInfoProvider: deployedCCInfoProvider, #與Peer節點交互相關 MembershipInfoProvider: membershipInfoProvider, #這個不太清楚,與Peer節點的屬性相關? MetricsProvider: metricsProvider, #健康檢查 HealthCheckRegistry: opsSystem, }, ) #判斷是否處於開發模式下 if chaincodeDevMode { logger.Info("Running in chaincode development mode") logger.Info("Disable loading validity system chaincode") viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode) } #裏面有兩個方法,分別是獲取本地地址與獲取當前Peer節點實例地址,將地址進行緩存 if err := peer.CacheConfiguration(); err != nil { return err } #獲取當前Peer節點實例地址,若是沒有進行緩存,則會執行上一步的CacheConfiguration()方法 peerEndpoint, err := peer.GetPeerEndpoint() if err != nil { err = fmt.Errorf("Failed to get Peer Endpoint: %s", err) return err } #簡單的字符串操做,獲取Host peerHost, _, err := net.SplitHostPort(peerEndpoint.Address) if err != nil { return fmt.Errorf("peer address is not in the format of host:port: %v", err) } #獲取監聽地址,該屬性在opsSystem中定義過 listenAddr := viper.GetString("peer.listenAddress") #返回當前Peer節點的gRPC服務器配置,該方法主要就是設置TLS與心跳信息,在/core/peer/config.go文件中第128行。 serverConfig, err := peer.GetServerConfig() if err != nil { logger.Fatalf("Error loading secure config for peer (%s)", err) } #設置gRPC最大併發 grpcMaxConcurrency=2500 throttle := comm.NewThrottle(grpcMaxConcurrency) #設置日誌信息 serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer") serverConfig.MetricsProvider = metricsProvider #設置攔截器,再也不細說 serverConfig.UnaryInterceptors = append( serverConfig.UnaryInterceptors, grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.UnaryServerIntercptor, ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.StreamServerInterceptor, )
到這裏建立了Peer節點的gRPC服務器,將以前的監聽地址與服務器配置傳了進去:docker
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig) if err != nil { logger.Fatalf("Failed to create peer server (%s)", err) }
關於權限的一些配置:json
#TLS的相關設置 if serverConfig.SecOpts.UseTLS { logger.Info("Starting peer with TLS enabled") // set up credential support cs := comm.GetCredentialSupport() roots, err := peer.GetServerRootCAs() if err != nil { logger.Fatalf("Failed to set TLS server root CAs: %s", err) } cs.ServerRootCAs = roots // set the cert to use if client auth is requested by remote endpoints clientCert, err := peer.GetClientCertificate() if err != nil { logger.Fatalf("Failed to set TLS client certificate: %s", err) } comm.GetCredentialSupport().SetClientCertificate(clientCert) } mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert #策略檢查Provider,看傳入的參數就比較清楚了,Envelope,通道ID,環境變量 policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc { return func(env *cb.Envelope, channelID string) error { return aclProvider.CheckACL(resourceName, channelID, env) } }
建立了另外一個服務器,與上面的權限設置相關,用於交付與過濾區塊的事件服務器:緩存
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider) #將以前建立的gRPC服務器與用於交付與過濾區塊的事件服務器註冊到這裏 pb.RegisterDeliverServer(peerServer.Server(), abServer)
接下來是與鏈碼相關的操做:服務器
#啓動與鏈碼相關的服務器,看傳入的值 Peer節點的主機名,訪問控制列表Provider,pr是以前提到與語言相關的,以及以前的運行環境 #主要完成三個操做:1.設置本地鏈碼安裝路徑,2.建立自簽名CA,3,啓動鏈碼gRPC監聽服務,該方法在本文件中第709行 chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem) logger.Debugf("Running peer") #啓動管理員服務,這個不太懂幹嗎的 startAdminServer(listenAddr, peerServer.Server(), metricsProvider) privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error { #看這個方法是分發私有數據到其餘節點 return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt) } ========================TxPvtReadWriteSetWithConfigInfo========================== #看這裏,主要是私有的讀寫集以及配置信息 type TxPvtReadWriteSetWithConfigInfo struct { EndorsedAt uint64 `protobuf:"varint,1,opt,name=endorsed_at,json=endorsedAt,proto3" json:"endorsed_at,omitempty"` PvtRwset *rwset.TxPvtReadWriteSet `protobuf:"bytes,2,opt,name=pvt_rwset,json=pvtRwset,proto3" json:"pvt_rwset,omitempty"` CollectionConfigs map[string]*common.CollectionConfigPackage `protobuf:"bytes,3,rep,name=collection_configs,json=collectionConfigs,proto3" json:"collection_configs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } ============================TxPvtReadWriteSetWithConfigInfo========================== #獲取本地的已簽名的身份信息,主要是看當前節點具備的功能,好比背書,驗證 signingIdentity := mgmt.GetLocalSigningIdentityOrPanic() serializedIdentity, err := signingIdentity.Serialize() if err != nil { logger.Panicf("Failed serializing self identity: %v", err) } # libConf := library.Config{} ================================Config============================= type Config struct { #權限過濾 AuthFilters []*HandlerConfig `mapstructure:"authFilters" yaml:"authFilters"` #這個不清楚 Decorators []*HandlerConfig `mapstructure:"decorators" yaml:"decorators"` #背書 Endorsers PluginMapping `mapstructure:"endorsers" yaml:"endorsers"` #驗證 Validators PluginMapping `mapstructure:"validators" yaml:"validators"` } ==================================Config============================= if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil { return errors.WithMessage(err, "could not load YAML config") } #建立一個Registry實例,將上面的配置註冊到這裏 reg := library.InitRegistry(libConf) #這一部分是背書操做的相關設置,不貼出來了 ... #設置完以後註冊背書服務 pb.RegisterEndorserServer(peerServer.Server(), auth) #建立通道策略管理者,好比哪些節點或用戶具備可讀,可寫,可操做的權限,都是由它管理 policyMgr := peer.NewChannelPolicyManagerGetter() #建立用於廣播的服務,就是區塊鏈中用於向其餘節點發送消息的服務 err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
到這裏,鏈碼的相關配置已經差很少了,到了部署系統鏈碼的地方了:網絡
#這一行代碼就是將系統鏈碼部署上去 sccp.DeploySysCCs("", ccp) logger.Infof("Deployed system chaincodes") installedCCs := func() ([]ccdef.InstalledChaincode, error) { #查看已經安裝的鏈碼 return packageProvider.ListInstalledChaincodes() } #與鏈碼的生命週期相關 lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs)) if err != nil { logger.Panicf("Failed creating lifecycle: +%v", err) } #處理鏈碼的元數據更新,由其餘節點廣播 onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) { service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel)) }) #添加監聽器監聽鏈碼元數據更新 lifecycle.AddListener(onUpdate)
這一部分是與通道的初始化相關的內容:
peer.Initialize(func(cid string) { logger.Debugf("Deploying system CC, for channel <%s>", cid) sccp.DeploySysCCs(cid, ccp) #獲取通道的描述信息,就是通道的基本屬性 sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) { #根據通道ID獲取帳本的查詢執行器 return peer.GetLedger(cid).NewQueryExecutor() })) if err != nil { logger.Panicf("Failed subscribing to chaincode lifecycle updates") } #爲通道註冊監聽器 cceventmgmt.GetMgr().Register(cid, sub) }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName), pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider) #當前節點狀態改變後是否能夠被發現 if viper.GetBool("peer.discovery.enabled") { registerDiscoveryService(peerServer, policyMgr, lifecycle) } #獲取Peer節點加入的網絡ID networkID := viper.GetString("peer.networkId") logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) #查看是否已經定義了配置文件 profileEnabled := viper.GetBool("peer.profile.enabled") profileListenAddress := viper.GetString("peer.profile.listenAddress") #建立進程啓動gRPC服務器 serve := make(chan error) go func() { var grpcErr error if grpcErr = peerServer.Start(); grpcErr != nil { grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr) } else { logger.Info("peer server exited") } serve <- grpcErr }() #若是已經定義了配置文件,則啓動監聽服務 if profileEnabled { go func() { logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress) if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil { logger.Errorf("Error starting profiler: %s", profileErr) } }() } #開始處理接收到的消息了 go handleSignals(addPlatformSignals(map[os.Signal]func(){ syscall.SIGINT: func() { serve <- nil }, syscall.SIGTERM: func() { serve <- nil }, })) logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) #阻塞在這裏,除非gRPC服務中止 return <-serve }
到這裏Peer
節點已經啓動完成了,過程仍是很複雜的,這裏總結一下總體的過程:
Peer
節點的信息。
CacheConfiguration()
,主要保存其餘Peer
節點的相關信息。PeerServer
。
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
DeliverEventsServer
。
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
fabric/core/peer/deliverevents.go
,該服務主要用於區塊的交付與過濾,主要方法:Deliver(),DeliverFiltered()ChaincodeServer
。
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
core/chaincode/chaincode_support.go
,返回了ChaincodeSupport:爲Peer提供執行鏈碼的接口,主要功能有Launch():啓動一箇中止運行的鏈碼,Stop():中止鏈碼的運行,HandleChaincodeStream():處理鏈碼流信息,Register():將鏈碼註冊到當前Peer節點 ,createCCMessage():建立一個交易,ExecuteLegacyInit():鏈碼的實例化,Execute():執行鏈碼並返回回原始的響應,processChaincodeExecutionResult():處理鏈碼的執行結果,InvokeInit():調用鏈碼的Init方法,Invoke():調用鏈碼,execute():執行一個交易
AdminServer
。
startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
core/protos/peer/admin.go
文件,具備GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()
等方法EndorserServer
。
pb.RegisterEndorserServer(peerServer.Server(), auth)
core/endorser/endorser.go
文件,註冊背書服務器,提供了一個很重要的方法:ProcessProposal()
,這個方法值得看一下。GossipService
。
err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
gossip/service/gossip_service.go
,具備InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()
等方法流程圖:,因爲Fabric在不斷更新,因此代碼和圖中仍是有一些不一樣的。
參考:這裏