docker push 實現過程

這一篇文章分析一下docker push的過程;docker push是將本地的鏡像上傳到registry service的過程;git

根據前幾篇文章,能夠知道客戶端的命令是在api/client/push.go中,CmdPush()函數:github

基本思路就是將經過解析cmd.Arg(0)參數,提取去要push的鏡像的repository 和 tag,經過registry 和 repository得到repostoryInfo;若是須要安全驗證,那麼還要設置一下authConfig;接着經過POST:/images/xxxx/push? 請求調用server端的postImagesPush()函數;(在api/server/image.go)中,主要來分析一下這個函數:docker

func (s *Server) postImagesPush(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string)   error {json

    if vars == nil {api

        return fmt.Errorf("Missing parameter")安全

    } 數據結構

    metaHeaders := map[string][]string{}app

    for k, v := range r.Header {函數

        if strings.HasPrefix(k, "  ") {post

            metaHeaders[k] = v

        }

    }

    if err := parseForm(r); err != nil {

        return err

    }

    authConfig := &cliconfig.AuthConfig{}

    authEncoded := r.Header.Get("X-Registry-Auth")

    if authEncoded != "" {

        // the new format is to handle the authConfig as a header

        authJSON := base64.NewDecoder(base64.URLEncoding, strings.NewReader(authEncoded))

        if err := json.NewDecoder(authJSON).Decode(authConfig); err != nil {

            // to increase compatibility to existing api it is defaulting to be empty

            authConfig = &cliconfig.AuthConfig{}

        }

    } else {

        // the old format is supported for compatibility if there was no authConfig header 

        if err := json.NewDecoder(r.Body).Decode(authConfig); err != nil {

            return fmt.Errorf("Bad parameters and missing X-Registry-Auth: %v", err)

        }

    }

    name := vars["name"]

    output := ioutils.NewWriteFlusher(w)

    imagePushConfig := &graph.ImagePushConfig{

        MetaHeaders: metaHeaders,

        AuthConfig:  authConfig,

        Tag:         r.Form.Get("tag"),

        OutStream:   output,

    }

    w.Header().Set("Content-Type", "application/json")

    if err := s.daemon.Repositories().Push(name, imagePushConfig); err != nil {

        if !output.Flushed() {

            return err

        }

        sf := streamformatter.NewJSONStreamFormatter()

        output.Write(sf.FormatError(err))

    }

    return nil

}

最主要的流程是前面是封裝好 imagePushConfig 這個數據結構,imagePushConfig主要包括http request中的header信息、鏡像tag、認證信息以及一個回寫客戶端的output Writer(),而後調用s.daemon.Repositories().Push(name, imagePushConfig) 來進行鏡像文件的上傳;以前的文章中介紹過,deamon的Repositories()函數返回的實際上是TagStore結構(/graph/tags.go 文件中);直接去看一下TagStore()的Push函數:

// Push initiates a push operation on the repository named localName.

func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {

    // FIXME: Allow to interrupt current push when new push of same image is done.

    var sf = streamformatter.NewJSONStreamFormatter()

    // Resolve the Repository name from fqn to RepositoryInfo

    repoInfo, err := s.registryService.ResolveRepository(localName)

    if err != nil {

        return err

    }

    endpoints, err := s.registryService.LookupPushEndpoints(repoInfo.CanonicalName)

    if err != nil {

        return err

    }

    reposLen := 1

    if imagePushConfig.Tag == "" {

        reposLen = len(s.Repositories[repoInfo.LocalName])

    }

    imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo.        CanonicalName, reposLen))

    // If it fails, try to get the repository

    localRepo, exists := s.Repositories[repoInfo.LocalName]

    if !exists {

        return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)

    } 

    var lastErr error

    for _, endpoint := range endpoints {

        logrus.Debugf("Trying to push %s to %s %s", repoInfo.CanonicalName, endpoint.URL, endpoint.Version)

        pusher, err := s.NewPusher(endpoint, localRepo, repoInfo, imagePushConfig, sf)

        if err != nil {

            lastErr = err

            continue

        }

        if fallback, err := pusher.Push(); err != nil {

            if fallback {

                lastErr = err

                continue

            }

            logrus.Debugf("Not continuing with error: %v", err)

            return err

        }

        s.eventsService.Log("push", repoInfo.LocalName, "")

        return nil

    }

    if lastErr == nil {

        lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.CanonicalName)

    }

    return lastErr

}

首先經過localName獲取repositoryInfo,而後經過repositoryInfo獲取endpoints,而後針對每個endpoint,實例化一個pusher,經過pusher.Push()來實現鏡像的上傳,程序的流程和結構很好理解;看一下NewPusher()的代碼:

func (s *TagStore) NewPusher(endpoint registry.APIEndpoint, localRepo Repository, repoInfo *registry.RepositoryInfo,       imagePushConfig *ImagePushConfig, sf *streamformatter.StreamFormatter) (Pusher, error) {

    switch endpoint.Version {

    case registry.APIVersion2:

        return &v2Pusher{

            TagStore:  s,

            endpoint:  endpoint,

            localRepo: localRepo,

            repoInfo:  repoInfo,

            config:    imagePushConfig,

            sf:        sf,

        }, nil

    case registry.APIVersion1:

        return &v1Pusher{

            TagStore:  s,

            endpoint:  endpoint,

            localRepo: localRepo,

            repoInfo:  repoInfo,

            config:    imagePushConfig,

            sf:        sf,

        }, nil

    }

    return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)

}

主要是根據endpoint中version的不一樣,來決定實例化哪一個pushser;暫且以v2 pusher()爲例來進行分析;

type v2Pusher struct {

    *TagStore

    endpoint  registry.APIEndpoint

    localRepo Repository

    repoInfo  *registry.RepositoryInfo

    config    *ImagePushConfig

    sf        *streamformatter.StreamFormatter

    repo      distribution.Repository

}

這個是v2Pusher的定義;接下來是v2Pusher的Push(/graph/push_v2.go文件)函數:

func (p *v2Pusher) Push() (fallback bool, err error) {

    p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)

    if err != nil {

        logrus.Debugf("Error getting v2 registry: %v", err)

        return true, err

    }

    return false, p.pushV2Repository(p.config.Tag)

}

首先實例化出來NewV2Repository實例(/graph/registry.go文件中)複製給pusher的repo實例,而後調用pusher的pushV2Repository函數進行鏡像的上傳;新的pusher.repo實例,也就是NewV2Repository返回的實際上是一個新的repository(vendor/src/github.com/docker/distribution/registry/client/repository.go文件中),它主要新建了一個提供了超時設定、權限驗證和對遠程endpoint的api version進行驗證的http transport,負責鏡像的上傳;接着看一下最重要的pusher的pushV2Repository函數;

func (p *v2Pusher) pushV2Repository(tag string) error {

    localName := p.repoInfo.LocalName

    if _, err := p.poolAdd("push", localName); err != nil {

        return err

    }

    defer p.poolRemove("push", localName)

    tags, err := p.getImageTags(tag)

    if err != nil {

        return fmt.Errorf("error getting tags for %s: %s", localName, err)

    }

    if len(tags) == 0 {

        return fmt.Errorf("no tags to push for %s", localName)

    }

    for _, tag := range tags {

        if err := p.pushV2Tag(tag); err != nil {

            return err

        }

    } 

    return nil

}

調用TagStore的poolAdd函數,將要下載的的名字加入的TagStore的pushingPool裏面,保證同一時刻只有一個同名鏡像在下載;經過getImageTags(tag)獲取對應tag的要下載的鏡像,實際上使用pusher的localRepo中查找對應tag的鏡像,localRepo 是Repository類型,更確切說是map[string]string類型,key爲tag,value爲layerID;若是沒有傳過來的tag爲空的話,則返回全部的鏡像;接着分析p.pushV2Tag()函數

func (p *v2Pusher) pushV2Tag(tag string) error {

    logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)

    layerID, exists := p.localRepo[tag]

    if !exists {

        return fmt.Errorf("tag does not exist: %s", tag)

    }

 

    layersSeen := make(map[string]bool)

 

    layer, err := p.graph.Get(layerID)

    if err != nil {

        return err

    }

 

    m := &manifest.Manifest{

        Versioned: manifest.Versioned{

            SchemaVersion: 1,

        },

        Name:         p.repo.Name(),

        Tag:          tag,

        Architecture: layer.Architecture,

        FSLayers:     []manifest.FSLayer{},

        History:      []manifest.History{},

    }

    var metadata runconfig.Config

    if layer != nil && layer.Config != nil {

        metadata = *layer.Config

    }

 

    out := p.config.OutStream

 

    for ; layer != nil; layer, err = p.graph.GetParent(layer) {

        if err != nil {

            return err

        }

        if layersSeen[layer.ID] {

            break

        }

        logrus.Debugf("Pushing layer: %s", layer.ID)

        if layer.Config != nil && metadata.Image != layer.ID {

            if err := runconfig.Merge(&metadata, layer.Config); err != nil {

                return err

            }

        }

        jsonData, err := p.graph.RawJSON(layer.ID)

        if err != nil {

            return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)

        }

        var exists bool

        dgst, err := p.graph.GetDigest(layer.ID)

        switch err {

        case nil:

            _, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)

            switch err {

            case nil:

                exists = true

                out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))

            case distribution.ErrBlobUnknown:

                // nop

            default:

                out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))

                return err

            }

        case ErrDigestNotSet:

            // nop

        case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:

            return fmt.Errorf("error getting image checksum: %v", err)

        }

        // if digest was empty or not saved, or if blob does not exist on the remote repository,

        // then fetch it.

        if !exists {

            if pushDigest, err := p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {

                return err

            } else if pushDigest != dgst {

                // Cache new checksum

                if err := p.graph.SetDigest(layer.ID, pushDigest); err != nil {

                    return err

                }

                dgst = pushDigest

            }

        }

        m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})

        m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})

        layersSeen[layer.ID] = true

    }

    logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())

    signed, err := manifest.Sign(m, p.trustKey)

    if err != nil {

        return err

    }

    manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())

    if err != nil {

        return err

    }

    if manifestDigest != "" {

        out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))

    }

    manSvc, err := p.repo.Manifests(context.Background())

    if err != nil {

        return err

    }

    return manSvc.Put(signed)

}

pushV2Tag()函數比較長,從上到下一點點分析下:首先根據tag從localRepo中找到LayerID,而後經過graph.Get()經過layerID返回layer,實際是一個image結構;layerSeen是一個map,表示的哪些已經被push過了,以避免重複;接下來實例化一個manifest結構,manifest主要記錄鏡像的全部layer信息(包括鏡像的校驗和以及鏡像jsondata中的信息);

接着就是一個for 循環,從一個layer中不斷的取得這個layer的父鏡像;而後上傳;上傳鏡像以前,先經過graph獲取這個鏡像的jsondata和digest校驗和,經過 p.repo.Blobs(context.Background()).Stat(context.Background(), dgst) 判斷校驗和在將要上傳的地點是否已經存在,存在的話則打印出相關提示信息,不存在的狀況則會調用  p.pushV2Image() 去上傳鏡像;上傳完鏡像後會生成新的digest,接着調用

 m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})

 m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})

 layersSeen[layer.ID] = true

這三行代碼,將layer信息放到manifest文件中,而且設置layerSeen標明這個layer已經上傳過了;

當全部的鏡像上傳完畢後,接着對manifest文件進行簽名、生成manifest文件的digest信息、而後經過repo將manifest信息上傳;

 

以上就是docker push的大體步驟

相關文章
相關標籤/搜索