這一篇文章分析一下docker push的過程;docker push是將本地的鏡像上傳到registry service的過程;git
根據前幾篇文章,能夠知道客戶端的命令是在api/client/push.go中,CmdPush()函數:github
基本思路就是將經過解析cmd.Arg(0)參數,提取去要push的鏡像的repository 和 tag,經過registry 和 repository得到repostoryInfo;若是須要安全驗證,那麼還要設置一下authConfig;接着經過POST:/images/xxxx/push? 請求調用server端的postImagesPush()函數;(在api/server/image.go)中,主要來分析一下這個函數:docker
func (s *Server) postImagesPush(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {json
if vars == nil {api
return fmt.Errorf("Missing parameter")安全
} 數據結構
metaHeaders := map[string][]string{}app
for k, v := range r.Header {函數
if strings.HasPrefix(k, " ") {post
metaHeaders[k] = v
}
}
if err := parseForm(r); err != nil {
return err
}
authConfig := &cliconfig.AuthConfig{}
authEncoded := r.Header.Get("X-Registry-Auth")
if authEncoded != "" {
// the new format is to handle the authConfig as a header
authJSON := base64.NewDecoder(base64.URLEncoding, strings.NewReader(authEncoded))
if err := json.NewDecoder(authJSON).Decode(authConfig); err != nil {
// to increase compatibility to existing api it is defaulting to be empty
authConfig = &cliconfig.AuthConfig{}
}
} else {
// the old format is supported for compatibility if there was no authConfig header
if err := json.NewDecoder(r.Body).Decode(authConfig); err != nil {
return fmt.Errorf("Bad parameters and missing X-Registry-Auth: %v", err)
}
}
name := vars["name"]
output := ioutils.NewWriteFlusher(w)
imagePushConfig := &graph.ImagePushConfig{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
Tag: r.Form.Get("tag"),
OutStream: output,
}
w.Header().Set("Content-Type", "application/json")
if err := s.daemon.Repositories().Push(name, imagePushConfig); err != nil {
if !output.Flushed() {
return err
}
sf := streamformatter.NewJSONStreamFormatter()
output.Write(sf.FormatError(err))
}
return nil
}
最主要的流程是前面是封裝好 imagePushConfig 這個數據結構,imagePushConfig主要包括http request中的header信息、鏡像tag、認證信息以及一個回寫客戶端的output Writer(),而後調用s.daemon.Repositories().Push(name, imagePushConfig) 來進行鏡像文件的上傳;以前的文章中介紹過,deamon的Repositories()函數返回的實際上是TagStore結構(/graph/tags.go 文件中);直接去看一下TagStore()的Push函數:
// Push initiates a push operation on the repository named localName.
func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {
// FIXME: Allow to interrupt current push when new push of same image is done.
var sf = streamformatter.NewJSONStreamFormatter()
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := s.registryService.ResolveRepository(localName)
if err != nil {
return err
}
endpoints, err := s.registryService.LookupPushEndpoints(repoInfo.CanonicalName)
if err != nil {
return err
}
reposLen := 1
if imagePushConfig.Tag == "" {
reposLen = len(s.Repositories[repoInfo.LocalName])
}
imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo. CanonicalName, reposLen))
// If it fails, try to get the repository
localRepo, exists := s.Repositories[repoInfo.LocalName]
if !exists {
return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)
}
var lastErr error
for _, endpoint := range endpoints {
logrus.Debugf("Trying to push %s to %s %s", repoInfo.CanonicalName, endpoint.URL, endpoint.Version)
pusher, err := s.NewPusher(endpoint, localRepo, repoInfo, imagePushConfig, sf)
if err != nil {
lastErr = err
continue
}
if fallback, err := pusher.Push(); err != nil {
if fallback {
lastErr = err
continue
}
logrus.Debugf("Not continuing with error: %v", err)
return err
}
s.eventsService.Log("push", repoInfo.LocalName, "")
return nil
}
if lastErr == nil {
lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.CanonicalName)
}
return lastErr
}
首先經過localName獲取repositoryInfo,而後經過repositoryInfo獲取endpoints,而後針對每個endpoint,實例化一個pusher,經過pusher.Push()來實現鏡像的上傳,程序的流程和結構很好理解;看一下NewPusher()的代碼:
func (s *TagStore) NewPusher(endpoint registry.APIEndpoint, localRepo Repository, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig, sf *streamformatter.StreamFormatter) (Pusher, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Pusher{
TagStore: s,
endpoint: endpoint,
localRepo: localRepo,
repoInfo: repoInfo,
config: imagePushConfig,
sf: sf,
}, nil
case registry.APIVersion1:
return &v1Pusher{
TagStore: s,
endpoint: endpoint,
localRepo: localRepo,
repoInfo: repoInfo,
config: imagePushConfig,
sf: sf,
}, nil
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
主要是根據endpoint中version的不一樣,來決定實例化哪一個pushser;暫且以v2 pusher()爲例來進行分析;
type v2Pusher struct {
*TagStore
endpoint registry.APIEndpoint
localRepo Repository
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
repo distribution.Repository
}
這個是v2Pusher的定義;接下來是v2Pusher的Push(/graph/push_v2.go文件)函數:
func (p *v2Pusher) Push() (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
return false, p.pushV2Repository(p.config.Tag)
}
首先實例化出來NewV2Repository實例(/graph/registry.go文件中)複製給pusher的repo實例,而後調用pusher的pushV2Repository函數進行鏡像的上傳;新的pusher.repo實例,也就是NewV2Repository返回的實際上是一個新的repository(vendor/src/github.com/docker/distribution/registry/client/repository.go文件中),它主要新建了一個提供了超時設定、權限驗證和對遠程endpoint的api version進行驗證的http transport,負責鏡像的上傳;接着看一下最重要的pusher的pushV2Repository函數;
func (p *v2Pusher) pushV2Repository(tag string) error {
localName := p.repoInfo.LocalName
if _, err := p.poolAdd("push", localName); err != nil {
return err
}
defer p.poolRemove("push", localName)
tags, err := p.getImageTags(tag)
if err != nil {
return fmt.Errorf("error getting tags for %s: %s", localName, err)
}
if len(tags) == 0 {
return fmt.Errorf("no tags to push for %s", localName)
}
for _, tag := range tags {
if err := p.pushV2Tag(tag); err != nil {
return err
}
}
return nil
}
調用TagStore的poolAdd函數,將要下載的的名字加入的TagStore的pushingPool裏面,保證同一時刻只有一個同名鏡像在下載;經過getImageTags(tag)獲取對應tag的要下載的鏡像,實際上使用pusher的localRepo中查找對應tag的鏡像,localRepo 是Repository類型,更確切說是map[string]string類型,key爲tag,value爲layerID;若是沒有傳過來的tag爲空的話,則返回全部的鏡像;接着分析p.pushV2Tag()函數
func (p *v2Pusher) pushV2Tag(tag string) error {
logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
layerID, exists := p.localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layersSeen := make(map[string]bool)
layer, err := p.graph.Get(layerID)
if err != nil {
return err
}
m := &manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: p.repo.Name(),
Tag: tag,
Architecture: layer.Architecture,
FSLayers: []manifest.FSLayer{},
History: []manifest.History{},
}
var metadata runconfig.Config
if layer != nil && layer.Config != nil {
metadata = *layer.Config
}
out := p.config.OutStream
for ; layer != nil; layer, err = p.graph.GetParent(layer) {
if err != nil {
return err
}
if layersSeen[layer.ID] {
break
}
logrus.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
if err := runconfig.Merge(&metadata, layer.Config); err != nil {
return err
}
}
jsonData, err := p.graph.RawJSON(layer.ID)
if err != nil {
return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
}
var exists bool
dgst, err := p.graph.GetDigest(layer.ID)
switch err {
case nil:
_, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
switch err {
case nil:
exists = true
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
case distribution.ErrBlobUnknown:
// nop
default:
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
return err
}
case ErrDigestNotSet:
// nop
case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
return fmt.Errorf("error getting image checksum: %v", err)
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then fetch it.
if !exists {
if pushDigest, err := p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {
return err
} else if pushDigest != dgst {
// Cache new checksum
if err := p.graph.SetDigest(layer.ID, pushDigest); err != nil {
return err
}
dgst = pushDigest
}
}
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
}
logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
signed, err := manifest.Sign(m, p.trustKey)
if err != nil {
return err
}
manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
if err != nil {
return err
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))
}
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
return manSvc.Put(signed)
}
pushV2Tag()函數比較長,從上到下一點點分析下:首先根據tag從localRepo中找到LayerID,而後經過graph.Get()經過layerID返回layer,實際是一個image結構;layerSeen是一個map,表示的哪些已經被push過了,以避免重複;接下來實例化一個manifest結構,manifest主要記錄鏡像的全部layer信息(包括鏡像的校驗和以及鏡像jsondata中的信息);
接着就是一個for 循環,從一個layer中不斷的取得這個layer的父鏡像;而後上傳;上傳鏡像以前,先經過graph獲取這個鏡像的jsondata和digest校驗和,經過 p.repo.Blobs(context.Background()).Stat(context.Background(), dgst) 判斷校驗和在將要上傳的地點是否已經存在,存在的話則打印出相關提示信息,不存在的狀況則會調用 p.pushV2Image() 去上傳鏡像;上傳完鏡像後會生成新的digest,接着調用
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
這三行代碼,將layer信息放到manifest文件中,而且設置layerSeen標明這個layer已經上傳過了;
當全部的鏡像上傳完畢後,接着對manifest文件進行簽名、生成manifest文件的digest信息、而後經過repo將manifest信息上傳;
以上就是docker push的大體步驟