接上回,若是是V2版本的Puller,位於pull_v2.go文件函數
func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) { p.repo, p.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull") ... if err = p.pullV2Repository(ctx, ref); err != nil { ... }
新建一個V2版本的倉庫賦值給p.repo,調用pullV2Respository函數ui
func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (err error) { var layersDownloaded bool if !reference.IsNameOnly(ref) { layersDownloaded, err = p.pullV2Tag(ctx, ref) ... } else { ... for _, tag := range tags { tagRef, err := reference.WithTag(ref, tag) .... pulledNew, err := p.pullV2Tag(ctx, tagRef) ... } } ... }
有tag的話,遍歷tag,每一個tag調用pullV2Tag,沒有的話直接調用pullV2Tagspa
func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdated bool, err error) { .... switch v := manifest.(type) { case *schema1.SignedManifest: if p.config.RequireSchema2 { return false, fmt.Errorf("invalid manifest: not schema2") } id, manifestDigest, err = p.pullSchema1(ctx, ref, v) if err != nil { return false, err } case *schema2.DeserializedManifest: id, manifestDigest, err = p.pullSchema2(ctx, ref, v) if err != nil { return false, err } case *manifestlist.DeserializedManifestList: id, manifestDigest, err = p.pullManifestList(ctx, ref, v) if err != nil { return false, err } default: return false, errors.New("unsupported manifest format") } .... }
先是獲取 manifest 屬性,根據屬性的類型作不一樣的處理。pullManifestList 最終調用 pullSchema1或者pullSchema2 ,這2個函數只是對於參數的處理方式不一樣,最終下載鏡像都是調用一樣的處理函數 code
rootFS, release :=p.config.DownloadManager.Download(ctx, *rootFS, descriptors, p.config.ProgressOutput)orm
imageID := p.config.ImageStore.Put(configJSON) // ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore),協程
DownloadManager 根據config初始化函數就應該是daemon的DownloadManager,而daemon的DownloadManager是經過下面函數構建的d.downloadManager = xfer.NewLayerDownloadManager(d.layerStore, *config.MaxConcurrentDownloads),位於distribution/xfer/download.go 是一個分層下載的阻塞函數。blog
if topDownload != nil { xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload) defer topDownload.Transfer.Release(watcher) } else { xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) } topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
download內調用makeDownloadFunc建立下載函數xferFunc,xferFunc函數內部初始化一個downloadTransfer 並返回。ip
xferFunc內部建立一個協程處理progressChan通道,調用descriptor.Download(..) , 這個是pull_v2.go內部初始化的v2LayerDescriptor,返回 io.ReadCloser,it
再經過io.ReadCloser 建立 reader解壓數據,利用downloadTransfer處理數據。io
Transfer函數位於/distribution/xfer/transfer.go,調用xferFunc建立 downloadTransfer並把通道信息傳給xferFunc建立的協程,返回Transfer, Watcher
Put是下面函數
func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) { id, err := s.Store.Create(c) return digest.Digest(id), err }
調用 image/store.go 的Create函數把byte數據在本地建立image