peer模塊採用cobra庫來實現cli命令。html
Cobra提供簡單的接口來建立強大的現代化CLI接口,好比git與go工具。Cobra同時也是一個程序, 用於建立CLI程序java
peer支持的命令以下所示:node
Usage: peer [command] Available Commands: chaincode Operate a chaincode: install|instantiate|invoke|package|query|signpackage|upgrade|list. channel Operate a channel: create|fetch|join|list|update|signconfigtx|getinfo. help Help about any command logging Log levels: getlevel|setlevel|revertlevels. node Operate a peer node: start|status. version Print fabric peer version. Flags: -h, --help help for peer --logging-level string Default logging level and overrides, see core.yaml for full syntax
經過peer 的docker-compose文件可知,peer啓動命令爲peer node start。從下列代碼可知,peer啓動時調用serve()接口。ios
var nodeStartCmd = &cobra.Command{ Use: "start", Short: "Starts the node.", Long: `Starts a node that interacts with the network.`, RunE: func(cmd *cobra.Command, args []string) error { if len(args) != 0 { return fmt.Errorf("trailing args detected") } // Parsing of the command line is done so silence cmd usage cmd.SilenceUsage = true return serve(args) }, }
接下來深刻分析serve()接口。git
func serve(args []string) error { // currently the peer only works with the standard MSP // because in certain scenarios the MSP has to make sure // that from a single credential you only have a single 'identity'. // Idemix does not support this *YET* but it can be easily // fixed to support it. For now, we just make sure that // the peer only comes up with the standard MSP // 當前peer啓動時只支持標準MSP即Fabric。 mspType := mgmt.GetLocalMSP().GetType() if mspType != msp.FABRIC { panic("Unsupported msp type " + msp.ProviderTypeToString(mspType)) } // Trace RPCs with the golang.org/x/net/trace package. This was moved out of // the deliver service connection factory as it has process wide implications // and was racy with respect to initialization of gRPC clients and servers. grpc.EnableTracing = true logger.Infof("Starting %s", version.GetInfo()) //startup aclmgmt with default ACL providers (resource based and default 1.0 policies based). //Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this) // 建立ACL提供者 ACL訪問控制列表 aclProvider := aclmgmt.NewACLProvider( aclmgmt.ResourceGetter(peer.GetStableChannelConfig), ) // 平臺註冊 pr := platforms.NewRegistry( &golang.Platform{}, &node.Platform{}, &java.Platform{}, &car.Platform{}, ) // 定義部署鏈碼提供者 deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
DeployedCCInfoProvider實現了DeployedChaincodeInfoProvider。golang
DeployedChaincodeInfoProvider是ledger用於構建集合配置歷史記錄的依賴項
LSCC模塊應該爲這個依賴項提供實現docker
type DeployedChaincodeInfoProvider interface { Namespaces() []string //命名空間 UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) // 保存更新的鏈碼 ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) // 保存鏈碼信息 CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) // 鏈碼集合信息 }
初始化帳本資源ledgermgmt.Initializeshell
// 獲取通道MSP管理員。若是不存在則建立 identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer { return mgmt.GetManagerForChain(chainID) } // peer 初始化 // 保存 peer 一些基本信息 ListenAddress TLS opsSystem := newOperationsSystem() // 監聽 ListenAddress err := opsSystem.Start() if err != nil { return errors.WithMessage(err, "failed to initialize operations subystems") } defer opsSystem.Stop() metricsProvider := opsSystem.Provider logObserver := floggingmetrics.NewObserver(metricsProvider) flogging.Global.SetObserver(logObserver) // 實例化私密數據成員membershipInfoProvider 用來判斷peer是否在某個私密數據的集合中 membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory) //initialize resource management exit // 初始化帳本資源 將前面實例化的對象都進行賦值 ledgermgmt.Initialize( &ledgermgmt.Initializer{ CustomTxProcessors: peer.ConfigTxProcessors, PlatformRegistry: pr, DeployedChaincodeInfoProvider: deployedCCInfoProvider, MembershipInfoProvider: membershipInfoProvider, MetricsProvider: metricsProvider, HealthCheckRegistry: opsSystem, }, )
初始化peer GRPC服務配置緩存
// Parameter overrides must be processed before any parameters are // cached. Failures to cache cause the server to terminate immediately. // 判斷鏈碼是否時開發者模式 if chaincodeDevMode { logger.Info("Running in chaincode development mode") logger.Info("Disable loading validity system chaincode") viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode) } // 緩存peer地址getLocalAddress address:port if err := peer.CacheConfiguration(); err != nil { return err } // 獲取peer endpoint,沒有則調用CacheConfiguration接口 peerEndpoint, err := peer.GetPeerEndpoint() if err != nil { err = fmt.Errorf("Failed to get Peer Endpoint: %s", err) return err } // 獲取peer Host peerHost, _, err := net.SplitHostPort(peerEndpoint.Address) if err != nil { return fmt.Errorf("peer address is not in the format of host:port: %v", err) } listenAddr := viper.GetString("peer.listenAddress") // 獲取peer grpc相關配置 主要是TLS設置和心跳設置 serverConfig, err := peer.GetServerConfig() if err != nil { logger.Fatalf("Error loading secure config for peer (%s)", err) } // 設置GRPC最大併發2500 throttle := comm.NewThrottle(grpcMaxConcurrency) // GRPC server的一些配置 serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer") serverConfig.MetricsProvider = metricsProvider serverConfig.UnaryInterceptors = append( serverConfig.UnaryInterceptors, grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.UnaryServerIntercptor, ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.StreamServerInterceptor, )
將GRPC相關配置及Address傳入建立GRPC服務器服務器
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig) if err != nil { logger.Fatalf("Failed to create peer server (%s)", err) }
TLS及策略相關
// TLS相關配置 if serverConfig.SecOpts.UseTLS { logger.Info("Starting peer with TLS enabled") // set up credential support cs := comm.GetCredentialSupport() roots, err := peer.GetServerRootCAs() if err != nil { logger.Fatalf("Failed to set TLS server root CAs: %s", err) } cs.ServerRootCAs = roots // set the cert to use if client auth is requested by remote endpoints clientCert, err := peer.GetClientCertificate() if err != nil { logger.Fatalf("Failed to set TLS client certificate: %s", err) } comm.GetCredentialSupport().SetClientCertificate(clientCert) } mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert // 策略校驗 policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc { return func(env *cb.Envelope, channelID string) error { return aclProvider.CheckACL(resourceName, channelID, env) } }
建立deliver server 傳輸區塊及過濾區塊
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider) pb.RegisterDeliverServer(peerServer.Server(), abServer)
初始化鏈碼服務
startChaincodeServer將完成與鏈代碼相關的初始化,包括:
1)設置本地鏈代碼安裝路徑
2)建立特定鏈代碼的CA
3)啓動特定鏈代碼的gRPC監聽服務
// Initialize chaincode service chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem) logger.Debugf("Running peer")
註冊背書服務,gossip組件初始化等操做
// Start the Admin server startAdminServer(listenAddr, peerServer.Server(), metricsProvider) privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error { // 分發私有數據到其餘節點 return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt) } // 獲取本地簽名 signingIdentity := mgmt.GetLocalSigningIdentityOrPanic() serializedIdentity, err := signingIdentity.Serialize() if err != nil { logger.Panicf("Failed serializing self identity: %v", err) } libConf := library.Config{} if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil { return errors.WithMessage(err, "could not load YAML config") } reg := library.InitRegistry(libConf) // 背書 驗證相關配置 authFilters := reg.Lookup(library.Auth).([]authHandler.Filter) endorserSupport := &endorser.SupportImpl{ SignerSupport: signingIdentity, Peer: peer.Default, PeerSupport: peer.DefaultSupport, ChaincodeSupport: chaincodeSupport, SysCCProvider: sccp, ACLProvider: aclProvider, } endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory) validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory) signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport) channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport) pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName) pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{ ChannelStateRetriever: channelStateRetriever, TransientStoreRetriever: peer.TransientStoreFactory, PluginMapper: pluginMapper, SigningIdentityFetcher: signingIdentityFetcher, }) endorserSupport.PluginEndorser = pluginEndorser serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider) auth := authHandler.ChainFilters(serverEndorser, authFilters...) // Register the Endorser server pb.RegisterEndorserServer(peerServer.Server(), auth) policyMgr := peer.NewChannelPolicyManagerGetter() // Initialize gossip component err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address) if err != nil { return err } defer service.GetGossipService().Stop() // register prover grpc service // FAB-12971 disable prover service before v1.4 cut. Will uncomment after v1.4 cut // err = registerProverService(peerServer, aclProvider, signingIdentity) // if err != nil { // return err // }
初始化系統鏈碼。
// initialize system chaincodes // deploy system chaincodes // 部署系統鏈碼 sccp.DeploySysCCs("", ccp) logger.Infof("Deployed system chaincodes") // 查看已經安裝等鏈碼 installedCCs := func() ([]ccdef.InstalledChaincode, error) { return packageProvider.ListInstalledChaincodes() } // 建立鏈碼的生命週期 lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs)) if err != nil { logger.Panicf("Failed creating lifecycle: +%v", err) } // HandleMetadataUpdate在鏈代碼生命週期更改發生變化時觸發 onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) { // 更新鏈碼 service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel)) }) // 監聽器 監聽鏈碼更新 lifecycle.AddListener(onUpdate)
通道相關配置
// this brings up all the channels peer.Initialize(func(cid string) { logger.Debugf("Deploying system CC, for channel <%s>", cid) sccp.DeploySysCCs(cid, ccp) sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) { // 返回通道的查詢器 return peer.GetLedger(cid).NewQueryExecutor() })) if err != nil { logger.Panicf("Failed subscribing to chaincode lifecycle updates") } // 註冊該通道ChaincodeLifecycleEventListener cceventmgmt.GetMgr().Register(cid, sub) }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName), pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider) // 獲取peer一些配置 if viper.GetBool("peer.discovery.enabled") { registerDiscoveryService(peerServer, policyMgr, lifecycle) } networkID := viper.GetString("peer.networkId") logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) // Get configuration before starting go routines to avoid // racing in tests profileEnabled := viper.GetBool("peer.profile.enabled") profileListenAddress := viper.GetString("peer.profile.listenAddress") // Start the grpc server. Done in a goroutine so we can deploy the // genesis block if needed. serve := make(chan error) // 開啓peer grpc服務 go func() { var grpcErr error if grpcErr = peerServer.Start(); grpcErr != nil { grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr) } else { logger.Info("peer server exited") } serve <- grpcErr }() // Start profiling http endpoint if enabled if profileEnabled { go func() { logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress) if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil { logger.Errorf("Error starting profiler: %s", profileErr) } }() } go handleSignals(addPlatformSignals(map[os.Signal]func(){ syscall.SIGINT: func() { serve <- nil }, syscall.SIGTERM: func() { serve <- nil }, })) // peer啓動區塊歸檔任務 if ledgerconfig.IsDataDumpEnabled() { logger.Debugf("DataDump:{DumpDir:%s,LoadDir:%s,MaxFileLimit:%d,DumpCron:%v,DumpInterval:%d,LoadRetryTimes:%d}", ledgerconfig.GetDataDumpPath(), ledgerconfig.GetDataLoadPath(), ledgerconfig.GetDataDumpFileLimit(), ledgerconfig.GetDataDumpCron(), ledgerconfig.GetDataDumpInterval(), ledgerconfig.GetDataLoadRetryTimes()) go func() { cronList := ledgerconfig.GetDataDumpCron() if cronList != nil && len(cronList) > 0 { cronTask := cron.New() cronTask.Start() for _, crontab := range cronList { logger.Debugf("Crontab addFunc for %s", crontab) err := cronTask.AddFunc(crontab, func() { chainInfoArray := peer.GetChannelsInfo() for _, chainInfo := range chainInfoArray { chainId := chainInfo.ChannelId l := peer.GetLedger(chainId) if err := l.DataDump(datadump.DumpForCronTab); err != nil { logger.Errorf("Failed to datadump for [%s]", err) } } }) if err != nil { logger.Errorf("Failed to add crontab task for %s", err) } } } }() } logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) if viper.GetBool("peer.enBlkrouter") { go func() { startBlockServer() }() } // Block until grpc server exits // 阻塞 直到grpc服務退出 return <-serve }
到這裏Peer
節點已經啓動完成了,過程仍是很複雜的,這裏總結一下總體的過程:
首先就是讀取配置信息,建立Cache結構,以及檢測其餘Peer節點的信息。
CacheConfiguration()
,主要保存其餘Peer
節點的相關信息。
建立PeerServer
。
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
建立DeliverEventsServer
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
fabric/core/peer/deliverevents.go
,該服務主要用於區塊的交付與過濾,主要方法:Deliver(),DeliverFiltered()啓動ChaincodeServer
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
core/chaincode/chaincode_support.go
,返回了ChaincodeSupport:爲Peer提供執行鏈碼的接口,主要功能有Launch():啓動一箇中止運行的鏈碼,Stop():中止鏈碼的運行,HandleChaincodeStream():處理鏈碼流信息,Register():將鏈碼註冊到當前Peer節點 ,createCCMessage():建立一個交易,ExecuteLegacyInit():鏈碼的實例化,Execute():執行鏈碼並返回回原始的響應,processChaincodeExecutionResult():處理鏈碼的執行結果,InvokeInit():調用鏈碼的Init方法,Invoke():調用鏈碼,execute():執行一個交易
啓動AdminServer
startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
core/protos/peer/admin.go
文件,具備GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()
等方法建立EndorserServer
pb.RegisterEndorserServer(peerServer.Server(), auth)
core/endorser/endorser.go
文件,註冊背書服務器,提供了一個很重要的方法:ProcessProposal()
,這個方法值得看一下。建立GossipService
err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
gossip/service/gossip_service.go
,具備InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()
等方法部署系統鏈碼。
初始化通道。
啓動gRPC服務。
若是啓用了profile,還會啓動監聽服務。