1、容器和虛擬機
在fabric中,有兩類鏈碼,一類是系統鏈碼,一類是用戶鏈碼。而鏈碼都須要安裝和實例化才能使用,在這當中,它們雖然原理類似,可是實現的方式仍是有所不一樣。在系統鏈碼中,首先要Register,而後再Deploy才能使用;而用戶鏈碼則首先要Install,而後再instantiate就能夠被外部接口使用了。
所以,對容器的啓動也可分紅這兩部分來進行解析,從宏觀上把握入口,而後分類進行源碼的解析。
git
2、總體的入口
在前面的分析中能夠知道在Launch函數中,是啓動容器的入口。那麼就從Launch這個函數開始看(core/chaincode/chaincode_support.go):github
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) { cname := chaincodeName + ":" + chaincodeVersion if h := cs.HandlerRegistry.Handler(cname); h != nil { return h, nil } //此處到得容器相關的信息,包括生產容器的具體類型是系統鏈碼容器仍是用戶鏈碼容器 //在後面會說明,系統鏈碼啓動的容器是:inprocVM---inproContainer,用戶鏈碼啓動的容器是DockerVM---DockerContainer ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe) if err != nil { // TODO: There has to be a better way to do this... if cs.UserRunsCC { chaincodeLogger.Error( "You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?", ) } return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname) } //啓動Runtime中的Launch if err := cs.Launcher.Launch(ccci); err != nil { return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname) } h := cs.HandlerRegistry.Handler(cname) if h == nil { return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname) } return h, nil } //runtime_launcher.go func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error { var startFailCh chan error var timeoutCh <-chan time.Time startTime := time.Now() cname := ccci.Name + ":" + ccci.Version launchState, alreadyStarted := r.Registry.Launching(cname) if !alreadyStarted { startFailCh = make(chan error, 1) timeoutCh = time.NewTimer(r.StartupTimeout).C codePackage, err := r.getCodePackage(ccci) if err != nil { return err } go func() { //啓動Process if err := r.Runtime.Start(ccci, codePackage); err != nil { startFailCh <- errors.WithMessage(err, "error starting container") return } exitCode, err := r.Runtime.Wait(ccci) if err != nil { launchState.Notify(errors.Wrap(err, "failed to wait on container exit")) } launchState.Notify(errors.Errorf("container exited with %d", exitCode)) }() } ...... return err } // Start launches chaincode in a runtime environment. func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error { cname := ccci.Name + ":" + ccci.Version lc, err := c.LaunchConfig(cname, ccci.Type) if err != nil { return err } chaincodeLogger.Debugf("start container: %s", cname) chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " ")) chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t")) scr := container.StartContainerReq{ Builder: &container.PlatformBuilder{ Type: ccci.Type, Name: ccci.Name, Version: ccci.Version, Path: ccci.Path, CodePackage: codePackage, PlatformRegistry: c.PlatformRegistry, }, Args: lc.Args, Env: lc.Envs, FilesToUpload: lc.Files, CCID: ccintf.CCID{ Name: ccci.Name, Version: ccci.Version, }, } //啓動Process--注意傳入的容器類型 if err := c.Processor.Process(ccci.ContainerType, scr); err != nil { return errors.WithMessage(err, "error starting container") } return nil } func (vmc *VMController) Process(vmtype string, req VMCReq) error { v := vmc.newVM(vmtype) ccid := req.GetCCID() id := ccid.GetName() vmc.lockContainer(id) defer vmc.unlockContainer(id) //啓動虛擬機 return req.Do(v) } //到這裏容器的實例化就進入到了接口的具體肯定階段,根據不一樣的類型來肯定是SCC或ACC func (vmc *VMController) newVM(typ string) VM { v, ok := vmc.vmProviders[typ] if !ok { vmLogger.Panicf("Programming error: unsupported VM type: %s", typ) } return v.NewVM() }
3、系統容器虛擬機的啓動
一、容器的啓動
在上面的Process中最後一行代碼中req.Do(v),啓動了相關的虛擬機容器。看一下這個接口的定義:
docker
type VMCReq interface { Do(v VM) error GetCCID() ccintf.CCID } //StartContainerReq - properties for starting a container. type StartContainerReq struct { ccintf.CCID Builder Builder Args []string Env []string FilesToUpload map[string][]byte } //StopContainerReq - properties for stopping a container. type StopContainerReq struct { ccintf.CCID Timeout uint //by default we will kill the container after stopping Dontkill bool //by default we will remove the container after killing Dontremove bool } func (w WaitContainerReq) Do(v VM) error { exited := w.Exited go func() { exitCode, err := v.Wait(w.CCID) exited(exitCode, err) }() return nil }
其它mock部分就不列出來了,供測試使用的,有興趣能夠看看源碼。再看一下實例的具體生成成,沿着上面的NewVM來看:json
// NewVM creates an inproc VM instance func (r *Registry) NewVM() container.VM { return NewInprocVM(r) } // NewInprocVM creates a new InprocVM func NewInprocVM(r *Registry) *InprocVM { return &InprocVM{ registry: r, } }
這裏只分析啓動部分,其它和這個基本差很少:bootstrap
func (si StartContainerReq) Do(v VM) error { return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder) } //Start starts a previously registered system codechain func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error { path := ccid.GetName() ipctemplate := vm.registry.getType(path) if ipctemplate == nil { return fmt.Errorf(fmt.Sprintf("%s not registered", path)) } instName := vm.GetVMName(ccid) ipc, err := vm.getInstance(ipctemplate, instName, args, env) if err != nil { return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName)) } if ipc.running { return fmt.Errorf(fmt.Sprintf("chaincode running %s", path)) } ipc.running = true go func() { defer func() { if r := recover(); r != nil { inprocLogger.Criticalf("caught panic from chaincode %s", instName) } }() ipc.launchInProc(instName, args, env) }() return nil }
這裏面會判斷是否生成了鏈碼的進程,不然:api
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error { if ipc.ChaincodeSupport == nil { inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()") } //創建一個Send和一個Recv通道 peerRcvCCSend := make(chan *pb.ChaincodeMessage) ccRcvPeerSend := make(chan *pb.ChaincodeMessage) var err error //鏈碼側通道Handler ccchan := make(chan struct{}, 1) //Peer側通道Handler ccsupportchan := make(chan struct{}, 1) shimStartInProc := _shimStartInProc // shadow to avoid race in test //鏈碼側相關 go func() { defer close(ccchan) inprocLogger.Debugf("chaincode started for %s", id) if args == nil { args = ipc.args } if env == nil { env = ipc.env } err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend) if err != nil { err = fmt.Errorf("chaincode-support ended with err: %s", err) _inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err) }() // shadow function to avoid data race inprocLoggerErrorf := _inprocLoggerErrorf //Peer側相關 go func() { defer close(ccsupportchan) inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend) inprocLogger.Debugf("chaincode-support started for %s", id) //消息處理 err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream) if err != nil { err = fmt.Errorf("chaincode ended with err: %s", err) inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err) }() select { case <-ccchan: close(peerRcvCCSend) inprocLogger.Debugf("chaincode %s quit", id) case <-ccsupportchan: close(ccRcvPeerSend) inprocLogger.Debugf("chaincode support %s quit", id) case <-ipc.stopChan: close(ccRcvPeerSend) close(peerRcvCCSend) inprocLogger.Debugf("chaincode %s stopped", id) } return err } // StartInProc is an entry point for system chaincodes bootstrap. It is not an // API for chaincodes. func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error { chaincodeLogger.Debugf("in proc %v", args) var chaincodename string for _, v := range env { if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 { p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=") chaincodename = p[1] break } } if chaincodename == "" { return errors.New("error chaincode id not provided") } stream := newInProcStream(recv, send) chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename) //看看這是誰,Handler消息處理就在這個函數裏,前面分析過,這裏就再也不贅述 err := chatWithPeer(chaincodename, stream, cc) return err }
系統鏈碼直接啓動了內存的虛擬機。只有用戶鏈碼纔會啓動Docker,在內部運行虛擬機。因此這兩者才分在類的生成中從同一接口繼承但分紅了兩個類。特別須要注意的是要看看上面對Peer側和鏈碼側的消息處理的生成過程,這個很是重要。代碼裏有直接註釋,感興趣能夠把代碼跟到底看看究竟是如何生成的。ide
二、消息的傳遞
看到了chatWithPeer,就想到了handleMessage,這個在前面有詳細分析,若是有什麼不明白能夠看看「鏈碼源碼分析」。下面只列出代碼:
函數
// handleMessage message handles loop for shim side of chaincode/peer stream. func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error { if msg.Type == pb.ChaincodeMessage_KEEPALIVE { chaincodeLogger.Debug("Sending KEEPALIVE response") handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work return nil } chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) var err error switch handler.state { case ready: err = handler.handleReady(msg, errc) case established: err = handler.handleEstablished(msg, errc) case created: err = handler.handleCreated(msg, errc) default: err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state) } if err != nil { payload := []byte(err.Error()) errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid} handler.serialSend(errorMsg) return err } return nil }
4、用戶容器虛擬機的啓動
一、虛擬機的啓動oop
// NewVM creates a new DockerVM instance func (p *Provider) NewVM() container.VM { return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics) } // NewDockerVM returns a new DockerVM instance func NewDockerVM(peerID, networkID string, buildMetrics *BuildMetrics) *DockerVM { return &DockerVM{ PeerID: peerID, NetworkID: networkID, getClientFnc: getDockerClient, BuildMetrics: buildMetrics, } } // Start starts a container using a previously created docker image func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error { imageName, err := vm.GetVMNameForDocker(ccid) if err != nil { return err } attachStdout := viper.GetBool("vm.docker.attachStdout") containerName := vm.GetVMName(ccid) logger := dockerLogger.With("imageName", imageName, "containerName", containerName) //經過VM得到客戶端 client, err := vm.getClientFnc() if err != nil { logger.Debugf("failed to get docker client", "error", err) return err } //中止容器和虛擬機 vm.stopInternal(client, containerName, 0, false, false) // 此處建立容器 err = vm.createContainer(client, imageName, containerName, args, env, attachStdout) if err == docker.ErrNoSuchImage { //若是沒有鏡像,則使用builder來建立相關容器 reader, err := builder.Build() if err != nil { return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName) } //部署鏡像 err = vm.deployImage(client, ccid, reader) if err != nil { return err } //建立鏡像後,再建立容器 err = vm.createContainer(client, imageName, containerName, args, env, attachStdout) if err != nil { logger.Errorf("failed to create container: %s", err) return err } } else if err != nil { logger.Errorf("create container failed: %s", err) return err } // stream stdout and stderr to chaincode logger if attachStdout { containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName) streamOutput(dockerLogger, client, containerName, containerLogger) } // upload specified files to the container before starting it // this can be used for configurations such as TLS key and certs //處理容器須要的證書相關的文件 if len(filesToUpload) != 0 { // the docker upload API takes a tar file, so we need to first // consolidate the file entries to a tar payload := bytes.NewBuffer(nil) gw := gzip.NewWriter(payload) tw := tar.NewWriter(gw) for path, fileToUpload := range filesToUpload { cutil.WriteBytesToPackage(path, fileToUpload, tw) } // Write the tar file out if err := tw.Close(); err != nil { return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err) } gw.Close() //上傳數據 err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{ InputStream: bytes.NewReader(payload.Bytes()), Path: "/", NoOverwriteDirNonDir: false, }) if err != nil { return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err) } } // start container with HostConfig was deprecated since v1.10 and removed in v1.2 err = client.StartContainer(containerName, nil) if err != nil { dockerLogger.Errorf("start-could not start container: %s", err) return err } dockerLogger.Debugf("Started container %s", containerName) return nil }
看一下建立容器的代碼:源碼分析
func (vm *DockerVM) createContainer(client dockerClient, imageID, containerID string, args, env []string, attachStdout bool) error { logger := dockerLogger.With("imageID", imageID, "containerID", containerID) logger.Debugw("create container") _, err := client.CreateContainer(docker.CreateContainerOptions{ Name: containerID, Config: &docker.Config{ Cmd: args, Image: imageID, Env: env, AttachStdout: attachStdout, AttachStderr: attachStdout, }, HostConfig: getDockerHostConfig(), }) if err != nil { return err } logger.Debugw("created container") return nil } // See https://goo.gl/tyzwVM for more details. func (c *Client) CreateContainer(opts CreateContainerOptions) (*Container, error) { path := "/containers/create?" + queryString(opts) resp, err := c.do( "POST", path, doOptions{ data: struct { *Config HostConfig *HostConfig `json:"HostConfig,omitempty" yaml:"HostConfig,omitempty" toml:"HostConfig,omitempty"` NetworkingConfig *NetworkingConfig `json:"NetworkingConfig,omitempty" yaml:"NetworkingConfig,omitempty" toml:"NetworkingConfig,omitempty"` }{ opts.Config, opts.HostConfig, opts.NetworkingConfig, }, context: opts.Context, }, ) if e, ok := err.(*Error); ok { if e.Status == http.StatusNotFound { return nil, ErrNoSuchImage } if e.Status == http.StatusConflict { return nil, ErrContainerAlreadyExists } // Workaround for 17.09 bug returning 400 instead of 409. // See https://github.com/moby/moby/issues/35021 if e.Status == http.StatusBadRequest && strings.Contains(e.Message, "Conflict.") { return nil, ErrContainerAlreadyExists } } if err != nil { return nil, err } defer resp.Body.Close() var container Container if err := json.NewDecoder(resp.Body).Decode(&container); err != nil { return nil, err } container.Name = opts.Name return &container, nil } func (c *Client) startContainer(id string, hostConfig *HostConfig, opts doOptions) error { path := "/containers/" + id + "/start" if c.serverAPIVersion == nil { c.checkAPIVersion() } if c.serverAPIVersion != nil && c.serverAPIVersion.LessThan(apiVersion124) { opts.data = hostConfig opts.forceJSON = true } resp, err := c.do("POST", path, opts) if err != nil { if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound { return &NoSuchContainer{ID: id, Err: err} } return err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotModified { return &ContainerAlreadyRunning{ID: id} } return nil }
二、消息的傳遞
在chaincode.go的shim包啓動時(用戶鏈碼啓動時)的Start函數中調用了userChaincodeStreamGetter–>chaincodeSupportClient.Register,而chaindcode_support.go中的Register實現了
(protos/peer/chaincode_shim.pb.go)
// ChaincodeSupportServer is the server API for ChaincodeSupport service. type ChaincodeSupportServer interface { Register(ChaincodeSupport_RegisterServer) error }
即Register調用HandleChaincodeStream,再調用ProcessStream,在其中的默認選項中調用handleMessage,又回到了Peer側。
而在前面的鏈碼源碼分析中已經分析過,一個用戶的鏈碼啓動是以在鏈碼的main函數中調用 shim.Start()爲開始的,此函數數最終會調用chatWithPeer函數,其中的默認項爲調用handleMessage,開始鏈碼鍘的消息循環。這樣兩者再按照前面提到的經過GRPC互相發送消息,就能夠展開一個用戶鏈碼和Peer側的通訊了.
5、總結
經過分析兩類鏈碼容器和執行狀況,基本上就明白了鏈碼源碼執行的環境,這正是對之前的「鏈碼源碼分析」的進一步完善。結合這兩篇文章基本上就明白了,鏈碼在Fabric上執行的看哪一個流程和方式。掌握了鏈碼執行的原理和運行的過程,就能夠針對實際狀況對其作爲相應的優化和修改,從爲我所用到我想我用。
推薦一下阿里朋友的PerfMa社區的課程,都是乾貨: