菜鳥系列Fabric源碼學習 — peer節點啓動

Fabric 1.4 源碼分析peer節點啓動

peer模塊採用cobra庫來實現cli命令。html

Cobra提供簡單的接口來建立強大的現代化CLI接口,好比git與go工具。Cobra同時也是一個程序, 用於建立CLI程序java

peer支持的命令以下所示:node

Usage:
  peer [command]

Available Commands:
  chaincode   Operate a chaincode: install|instantiate|invoke|package|query|signpackage|upgrade|list.
  channel     Operate a channel: create|fetch|join|list|update|signconfigtx|getinfo.
  help        Help about any command
  logging     Log levels: getlevel|setlevel|revertlevels.
  node        Operate a peer node: start|status.
  version     Print fabric peer version.

Flags:
  -h, --help                   help for peer
      --logging-level string   Default logging level and overrides, see core.yaml for full syntax

經過peer 的docker-compose文件可知,peer啓動命令爲peer node start。從下列代碼可知,peer啓動時調用serve()接口。ios

var nodeStartCmd = &cobra.Command{
    Use:   "start",
    Short: "Starts the node.",
    Long:  `Starts a node that interacts with the network.`,
    RunE: func(cmd *cobra.Command, args []string) error {
        if len(args) != 0 {
            return fmt.Errorf("trailing args detected")
        }
        // Parsing of the command line is done so silence cmd usage
        cmd.SilenceUsage = true
        return serve(args)
    },
}

接下來深刻分析serve()接口。git

func serve(args []string) error {
    // currently the peer only works with the standard MSP
    // because in certain scenarios the MSP has to make sure
    // that from a single credential you only have a single 'identity'.
    // Idemix does not support this *YET* but it can be easily
    // fixed to support it. For now, we just make sure that
    // the peer only comes up with the standard MSP
  // 當前peer啓動時只支持標準MSP即Fabric。
    mspType := mgmt.GetLocalMSP().GetType()
    if mspType != msp.FABRIC {
        panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
    }

    // Trace RPCs with the golang.org/x/net/trace package. This was moved out of
    // the deliver service connection factory as it has process wide implications
    // and was racy with respect to initialization of gRPC clients and servers.
    grpc.EnableTracing = true

    logger.Infof("Starting %s", version.GetInfo())

    //startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).
    //Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)
  // 建立ACL提供者 ACL訪問控制列表
    aclProvider := aclmgmt.NewACLProvider(
        aclmgmt.ResourceGetter(peer.GetStableChannelConfig),
    )
    // 平臺註冊 
    pr := platforms.NewRegistry(
        &golang.Platform{},
        &node.Platform{},
        &java.Platform{},
        &car.Platform{},
    )
    // 定義部署鏈碼提供者
    deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}

DeployedCCInfoProvider實現了DeployedChaincodeInfoProvider。golang

DeployedChaincodeInfoProvider是ledger用於構建集合配置歷史記錄的依賴項
LSCC模塊應該爲這個依賴項提供實現docker

type DeployedChaincodeInfoProvider interface {
    Namespaces() []string //命名空間
    UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) // 保存更新的鏈碼
    ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) // 保存鏈碼信息
    CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) // 鏈碼集合信息
}

初始化帳本資源ledgermgmt.Initializeshell

// 獲取通道MSP管理員。若是不存在則建立  
    identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {
        return mgmt.GetManagerForChain(chainID)
    }
    // peer 初始化
    // 保存 peer 一些基本信息 ListenAddress TLS
    opsSystem := newOperationsSystem()
    // 監聽 ListenAddress
    err := opsSystem.Start()
    if err != nil {
        return errors.WithMessage(err, "failed to initialize operations subystems")
    }
    defer opsSystem.Stop()
    metricsProvider := opsSystem.Provider
    logObserver := floggingmetrics.NewObserver(metricsProvider)
    flogging.Global.SetObserver(logObserver)
    // 實例化私密數據成員membershipInfoProvider 用來判斷peer是否在某個私密數據的集合中
    membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)
    //initialize resource management exit

    // 初始化帳本資源 將前面實例化的對象都進行賦值
    ledgermgmt.Initialize(
        &ledgermgmt.Initializer{
            CustomTxProcessors:            peer.ConfigTxProcessors,
            PlatformRegistry:              pr,
            DeployedChaincodeInfoProvider: deployedCCInfoProvider,
            MembershipInfoProvider:        membershipInfoProvider,
            MetricsProvider:               metricsProvider,
            HealthCheckRegistry:           opsSystem,
        },
    )

初始化peer GRPC服務配置緩存

// Parameter overrides must be processed before any parameters are
    // cached. Failures to cache cause the server to terminate immediately.
    // 判斷鏈碼是否時開發者模式
    if chaincodeDevMode {
        logger.Info("Running in chaincode development mode")
        logger.Info("Disable loading validity system chaincode")

        viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
    }
    // 緩存peer地址getLocalAddress address:port
    if err := peer.CacheConfiguration(); err != nil {
        return err
    }
    // 獲取peer endpoint,沒有則調用CacheConfiguration接口
    peerEndpoint, err := peer.GetPeerEndpoint()
    if err != nil {
        err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
        return err
    }
    // 獲取peer Host
    peerHost, _, err := net.SplitHostPort(peerEndpoint.Address)
    if err != nil {
        return fmt.Errorf("peer address is not in the format of host:port: %v", err)
    }

    listenAddr := viper.GetString("peer.listenAddress")
    // 獲取peer grpc相關配置 主要是TLS設置和心跳設置
    serverConfig, err := peer.GetServerConfig()
    if err != nil {
        logger.Fatalf("Error loading secure config for peer (%s)", err)
    }
    // 設置GRPC最大併發2500
    throttle := comm.NewThrottle(grpcMaxConcurrency)
    // GRPC server的一些配置
    serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
    serverConfig.MetricsProvider = metricsProvider
    serverConfig.UnaryInterceptors = append(
        serverConfig.UnaryInterceptors,
        grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
        grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
        throttle.UnaryServerIntercptor,
    )
    serverConfig.StreamInterceptors = append(
        serverConfig.StreamInterceptors,
        grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
        grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
        throttle.StreamServerInterceptor,
    )

將GRPC相關配置及Address傳入建立GRPC服務器服務器

peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
    if err != nil {
        logger.Fatalf("Failed to create peer server (%s)", err)
    }

TLS及策略相關

// TLS相關配置  
  if serverConfig.SecOpts.UseTLS {
        logger.Info("Starting peer with TLS enabled")
        // set up credential support
        cs := comm.GetCredentialSupport()
        roots, err := peer.GetServerRootCAs()
        if err != nil {
            logger.Fatalf("Failed to set TLS server root CAs: %s", err)
        }
        cs.ServerRootCAs = roots

        // set the cert to use if client auth is requested by remote endpoints
        clientCert, err := peer.GetClientCertificate()
        if err != nil {
            logger.Fatalf("Failed to set TLS client certificate: %s", err)
        }
        comm.GetCredentialSupport().SetClientCertificate(clientCert)
    }

    mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
    // 策略校驗
    policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {
        return func(env *cb.Envelope, channelID string) error {
            return aclProvider.CheckACL(resourceName, channelID, env)
        }
    }

建立deliver server 傳輸區塊及過濾區塊

abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
    pb.RegisterDeliverServer(peerServer.Server(), abServer)

初始化鏈碼服務

startChaincodeServer將完成與鏈代碼相關的初始化,包括:
1)設置本地鏈代碼安裝路徑
2)建立特定鏈代碼的CA
3)啓動特定鏈代碼的gRPC監聽服務

// Initialize chaincode service
    chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)

    logger.Debugf("Running peer")

註冊背書服務,gossip組件初始化等操做

// Start the Admin server
    startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
    
    privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
    // 分發私有數據到其餘節點
        return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
    }
    // 獲取本地簽名
    signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()
    serializedIdentity, err := signingIdentity.Serialize()
    if err != nil {
        logger.Panicf("Failed serializing self identity: %v", err)
    }

    libConf := library.Config{}
    if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {
        return errors.WithMessage(err, "could not load YAML config")
    }
    reg := library.InitRegistry(libConf)
    // 背書 驗證相關配置
    authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)
    endorserSupport := &endorser.SupportImpl{
        SignerSupport:    signingIdentity,
        Peer:             peer.Default,
        PeerSupport:      peer.DefaultSupport,
        ChaincodeSupport: chaincodeSupport,
        SysCCProvider:    sccp,
        ACLProvider:      aclProvider,
    }
    endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)
    validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)
    signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)
    channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)
    pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)
    pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{
        ChannelStateRetriever:   channelStateRetriever,
        TransientStoreRetriever: peer.TransientStoreFactory,
        PluginMapper:            pluginMapper,
        SigningIdentityFetcher:  signingIdentityFetcher,
    })
    endorserSupport.PluginEndorser = pluginEndorser
    serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)
    auth := authHandler.ChainFilters(serverEndorser, authFilters...)
    // Register the Endorser server
    pb.RegisterEndorserServer(peerServer.Server(), auth)

    policyMgr := peer.NewChannelPolicyManagerGetter()

    // Initialize gossip component
    err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
    if err != nil {
        return err
    }
    defer service.GetGossipService().Stop()

    // register prover grpc service
    // FAB-12971 disable prover service before v1.4 cut. Will uncomment after v1.4 cut
    // err = registerProverService(peerServer, aclProvider, signingIdentity)
    // if err != nil {
    //  return err
    // }

初始化系統鏈碼。

// initialize system chaincodes
    // deploy system chaincodes
    // 部署系統鏈碼
    sccp.DeploySysCCs("", ccp)
    logger.Infof("Deployed system chaincodes")
    // 查看已經安裝等鏈碼
    installedCCs := func() ([]ccdef.InstalledChaincode, error) {
        return packageProvider.ListInstalledChaincodes()
    }
    // 建立鏈碼的生命週期
    lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))
    if err != nil {
        logger.Panicf("Failed creating lifecycle: +%v", err)
    }
    // HandleMetadataUpdate在鏈代碼生命週期更改發生變化時觸發
    onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {
        // 更新鏈碼
    service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))
    })
    // 監聽器 監聽鏈碼更新
    lifecycle.AddListener(onUpdate)

通道相關配置

// this brings up all the channels
    peer.Initialize(func(cid string) {
        logger.Debugf("Deploying system CC, for channel <%s>", cid)
        sccp.DeploySysCCs(cid, ccp)
        sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
      // 返回通道的查詢器
            return peer.GetLedger(cid).NewQueryExecutor()
        }))
        if err != nil {
            logger.Panicf("Failed subscribing to chaincode lifecycle updates")
        }
    // 註冊該通道ChaincodeLifecycleEventListener
        cceventmgmt.GetMgr().Register(cid, sub)
    }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),
        pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)
    // 獲取peer一些配置
    if viper.GetBool("peer.discovery.enabled") {
        registerDiscoveryService(peerServer, policyMgr, lifecycle)
    }

    networkID := viper.GetString("peer.networkId")

    logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)

    // Get configuration before starting go routines to avoid
    // racing in tests
    profileEnabled := viper.GetBool("peer.profile.enabled")
    profileListenAddress := viper.GetString("peer.profile.listenAddress")

    // Start the grpc server. Done in a goroutine so we can deploy the
    // genesis block if needed.
    serve := make(chan error)
    // 開啓peer grpc服務
    go func() {
        var grpcErr error
        if grpcErr = peerServer.Start(); grpcErr != nil {
            grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
        } else {
            logger.Info("peer server exited")
        }
        serve <- grpcErr
    }()

    // Start profiling http endpoint if enabled
    if profileEnabled {
        go func() {
            logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
            if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
                logger.Errorf("Error starting profiler: %s", profileErr)
            }
        }()
    }

    go handleSignals(addPlatformSignals(map[os.Signal]func(){
        syscall.SIGINT:  func() { serve <- nil },
        syscall.SIGTERM: func() { serve <- nil },
    }))

    // peer啓動區塊歸檔任務
    if ledgerconfig.IsDataDumpEnabled() {
        logger.Debugf("DataDump:{DumpDir:%s,LoadDir:%s,MaxFileLimit:%d,DumpCron:%v,DumpInterval:%d,LoadRetryTimes:%d}",
            ledgerconfig.GetDataDumpPath(), ledgerconfig.GetDataLoadPath(), ledgerconfig.GetDataDumpFileLimit(),
            ledgerconfig.GetDataDumpCron(), ledgerconfig.GetDataDumpInterval(), ledgerconfig.GetDataLoadRetryTimes())
        go func() {
            cronList := ledgerconfig.GetDataDumpCron()

            if cronList != nil && len(cronList) > 0 {
                cronTask := cron.New()
                cronTask.Start()
                for _, crontab := range cronList {
                    logger.Debugf("Crontab addFunc for %s", crontab)
                    err := cronTask.AddFunc(crontab, func() {
                        chainInfoArray := peer.GetChannelsInfo()
                        for _, chainInfo := range chainInfoArray {
                            chainId := chainInfo.ChannelId
                            l := peer.GetLedger(chainId)
                            if err := l.DataDump(datadump.DumpForCronTab); err != nil {
                                logger.Errorf("Failed to datadump for [%s]", err)
                            }
                        }
                    })
                    if err != nil {
                        logger.Errorf("Failed to add crontab task for %s", err)
                    }
                }
            }
        }()
    }

    logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)

    if viper.GetBool("peer.enBlkrouter") {
        go func() {
            startBlockServer()
        }()
    }
    // Block until grpc server exits
    // 阻塞 直到grpc服務退出
    return <-serve
}

到這裏Peer節點已經啓動完成了,過程仍是很複雜的,這裏總結一下總體的過程:

  1. 首先就是讀取配置信息,建立Cache結構,以及檢測其餘Peer節點的信息。

    CacheConfiguration(),主要保存其餘Peer節點的相關信息。

  2. 建立PeerServer

    peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)

  3. 建立DeliverEventsServer

    1. abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
    2. pb.RegisterDeliverServer(peerServer.Server(), abServer)
    3. fabric/core/peer/deliverevents.go,該服務主要用於區塊的交付與過濾,主要方法:Deliver(),DeliverFiltered()
  4. 啓動ChaincodeServer

    1. chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
    2. core/chaincode/chaincode_support.go,返回了ChaincodeSupport:爲Peer提供執行鏈碼的接口,主要功能有Launch():啓動一箇中止運行的鏈碼,Stop():中止鏈碼的運行,HandleChaincodeStream():處理鏈碼流信息,Register():將鏈碼註冊到當前Peer節點 ,createCCMessage():建立一個交易,ExecuteLegacyInit():鏈碼的實例化,Execute():執行鏈碼並返回回原始的響應,processChaincodeExecutionResult():處理鏈碼的執行結果,InvokeInit():調用鏈碼的Init方法,Invoke():調用鏈碼,execute():執行一個交易
  5. 啓動AdminServer

    1. startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
    2. core/protos/peer/admin.go文件,具備GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()等方法
  6. 建立EndorserServer

    1. pb.RegisterEndorserServer(peerServer.Server(), auth)
    2. core/endorser/endorser.go文件,註冊背書服務器,提供了一個很重要的方法:ProcessProposal(),這個方法值得看一下。
  7. 建立GossipService

    1. err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
    2. gossip/service/gossip_service.go,具備InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()等方法
  8. 部署系統鏈碼。

  9. 初始化通道。

  10. 啓動gRPC服務。

  11. 若是啓用了profile,還會啓動監聽服務。

參考:http://www.javashuo.com/article/p-ocqylybk-kp.html

相關文章
相關標籤/搜索