KubeSphere Helm 應用倉庫源碼分析

做者:蔡錫生,LStack 平臺研發工程師,近期專一於基於 OAM 的應用託管平臺落地。mysql

背景介紹

KubeSphere 應用商店簡介

做爲一個開源的、以應用爲中心的容器平臺,KubeSphere 在 OpenPitrix 的基礎上,爲用戶提供了一個基於 Helm 的應用商店,用於應用生命週期管理。OpenPitrix 是一個開源的 Web 平臺,用於打包、部署和管理不一樣類型的應用。KubeSphere 應用商店讓 ISV、開發者和用戶可以在一站式服務中只需點擊幾下就能夠上傳、測試、部署和發佈應用。git

KubeSphere 中的 Helm 倉庫功能

  • KubeSphere Helm 倉庫添加

  • Helm repo list

  • KubeSphere Helm 倉庫中的應用模版查詢

Helm 倉庫簡介

Helm charts 是存放 K8s 應用模版的倉庫,該倉庫由 index.yaml 文件和 .tgz 模版包組成。github

[root@ningbo stable]# ls -al 
總用量 400
drwxr-xr-x. 26 root root   4096 6月  22 17:01 .
drwxr-xr-x.  4 root root     86 6月  22 16:37 ..
-rw-r--r--.  1 root root  10114 6月  22 17:12 index.yaml
-rw-r--r--.  1 root root   3803 6月   8 2020 lsh-cluster-csm-om-agent-0.1.0.tgz
-rw-r--r--.  1 root root   4022 6月   8 2020 lsh-mcp-cc-alert-service-0.1.0.tgz
-rw-r--r--.  1 root root   4340 6月   8 2020 lsh-mcp-cc-sms-service-0.1.0.tgz
-rw-r--r--.  1 root root   4103 6月   8 2020 lsh-mcp-cpm-metrics-exchange-0.1.0.tgz
-rw-r--r--.  1 root root   4263 6月   8 2020 lsh-mcp-cpm-om-service-0.1.0.tgz
-rw-r--r--.  1 root root   4155 6月   8 2020 lsh-mcp-csm-om-service-0.1.0.tgz
-rw-r--r--.  1 root root   3541 6月   8 2020 lsh-mcp-deploy-service-0.1.0.tgz
-rw-r--r--.  1 root root   5549 6月   8 2020 lsh-mcp-iam-apigateway-service-0.1.0.tgz
  • index.yaml 文件
apiVersion: v1
entries:
  aliyun-ccm:
  - apiVersion: v2
    appVersion: addon
    created: "2021-06-21T08:59:58Z"
    description: A Helm chart for Kubernetes
    digest: 6bda563c86333475255e5edfedc200ae282544e2c6e22b519a59b3c7bdef9a32
    name: aliyun-ccm
    type: application
    urls:
    - charts/aliyun-ccm-0.1.0.tgz
    version: 0.1.0
  aliyun-csi-driver:
  - apiVersion: v2
    appVersion: addon
    created: "2021-06-21T08:59:58Z"
    description: A Helm chart for Kubernetes
    digest: b49f128d7a49401d52173e6f58caedd3fabbe8e2827dc00e6a824ee38860fa51
    name: aliyun-csi-driver
    type: application
    urls:
    - charts/aliyun-csi-driver-0.1.0.tgz
    version: 0.1.0
  application-controller:
  - apiVersion: v1
    appVersion: addon
    created: "2021-06-21T08:59:58Z"
    description: A Helm chart for application Controller
    digest: 546e72ce77f865683ce0ea75f6e0203537a40744f2eb34e36a5bd378f9452bc5
    name: application-controller
    urls:
    - charts/application-controller-0.1.0.tgz
    version: 0.1.0
  • .tgz 解壓縮後的文件目錄
[root@ningbo stable]# cd mysql/
[root@ningbo mysql]# ls -al
總用量 20
drwxr-xr-x.  3 root root   97 5月  25 2020 .
drwxr-xr-x. 26 root root 4096 6月  22 17:01 ..
-rwxr-xr-x.  1 root root  106 5月  25 2020 Chart.yaml
-rwxr-xr-x.  1 root root  364 5月  25 2020 .Helmignore
-rwxr-xr-x.  1 root root   76 5月  25 2020 index.yaml
drwxr-xr-x.  3 root root  146 5月  25 2020 templates
-rwxr-xr-x.  1 root root 1735 5月  25 2020 values.yaml
  • Chart.yaml
[root@ningbo mysql]# cat Chart.yaml 
apiVersion: v1
appVersion: "1.0"
description: A Helm chart for Kubernetes
name: mysql
version: 0.1.0

添加 Helm 倉庫代碼介紹

接口實現分析

  1. 路由註冊
  2. handler,參數解析,調用 models 方面
  3. models ,調用 models 方法
  4. crd client,調用 K8s api 存儲
webservice.Route(webservice.POST("/repos").
        To(handler.CreateRepo). // 跟進
        Doc("Create a global repository, which is used to store package of app").
        Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}).
        Param(webservice.QueryParameter("validate", "Validate repository")).
        Returns(http.StatusOK, api.StatusOK, openpitrix.CreateRepoResponse{}).
        Reads(openpitrix.CreateRepoRequest{}))
func (h *openpitrixHandler) CreateRepo(req *restful.Request, resp *restful.Response) {

    createRepoRequest := &openpitrix.CreateRepoRequest{}
    err := req.ReadEntity(createRepoRequest)
    if err != nil {
        klog.V(4).Infoln(err)
        api.HandleBadRequest(resp, nil, err)
        return
    }

    createRepoRequest.Workspace = new(string)
    *createRepoRequest.Workspace = req.PathParameter("workspace")

    user, _ := request.UserFrom(req.Request.Context())
    creator := ""
    if user != nil {
        creator = user.GetName()
    }
    parsedUrl, err := url.Parse(createRepoRequest.URL)
    if err != nil {
        api.HandleBadRequest(resp, nil, err)
        return
    }
    userInfo := parsedUrl.User
    // trim credential from url
    parsedUrl.User = nil

    repo := v1alpha1.HelmRepo{
        ObjectMeta: metav1.ObjectMeta{
            Name: idutils.GetUuid36(v1alpha1.HelmRepoIdPrefix),
            Annotations: map[string]string{
                constants.CreatorAnnotationKey: creator,
            },
            Labels: map[string]string{
                constants.WorkspaceLabelKey: *createRepoRequest.Workspace,
            },
        },
        Spec: v1alpha1.HelmRepoSpec{
            Name:        createRepoRequest.Name,
            Url:         parsedUrl.String(),
            SyncPeriod:  0,
            Description: stringutils.ShortenString(createRepoRequest.Description, 512),
        },
    }

    if strings.HasPrefix(createRepoRequest.URL, "https://") || strings.HasPrefix(createRepoRequest.URL, "http://") {
        if userInfo != nil {
            repo.Spec.Credential.Username = userInfo.Username()
            repo.Spec.Credential.Password, _ = userInfo.Password()
        }
    } else if strings.HasPrefix(createRepoRequest.URL, "s3://") {
        cfg := v1alpha1.S3Config{}
        err := json.Unmarshal([]byte(createRepoRequest.Credential), &cfg)
        if err != nil {
            api.HandleBadRequest(resp, nil, err)
            return
        }
        repo.Spec.Credential.S3Config = cfg
    }

    var result interface{}
    // 1. validate repo
    result, err = h.openpitrix.ValidateRepo(createRepoRequest.URL, &repo.Spec.Credential)
    if err != nil {
        klog.Errorf("validate repo failed, err: %s", err)
        api.HandleBadRequest(resp, nil, err)
        return
    }

    // 2. create repo
    validate, _ := strconv.ParseBool(req.QueryParameter("validate"))
    if !validate {
        if repo.GetTrueName() == "" {
            api.HandleBadRequest(resp, nil, fmt.Errorf("repo name is empty"))
            return
        }
        result, err = h.openpitrix.CreateRepo(&repo) //跟進
    }

    if err != nil {
        klog.Errorln(err)
        handleOpenpitrixError(resp, err)
        return
    }

    resp.WriteEntity(result)
}
func (c *repoOperator) CreateRepo(repo *v1alpha1.HelmRepo) (*CreateRepoResponse, error) {
    name := repo.GetTrueName()

    items, err := c.repoLister.List(labels.SelectorFromSet(map[string]string{constants.WorkspaceLabelKey: repo.GetWorkspace()}))
    if err != nil && !apierrors.IsNotFound(err) {
        klog.Errorf("list Helm repo failed: %s", err)
        return nil, err
    }

    for _, exists := range items {
        if exists.GetTrueName() == name {
            klog.Error(repoItemExists, "name: ", name)
            return nil, repoItemExists
        }
    }

    repo.Spec.Description = stringutils.ShortenString(repo.Spec.Description, DescriptionLen)
    _, err = c.repoClient.HelmRepos().Create(context.TODO(), repo, metav1.CreateOptions{}) // 跟進
    if err != nil {
        klog.Errorf("create Helm repo failed, repo_id: %s, error: %s", repo.GetHelmRepoId(), err)
        return nil, err
    } else {
        klog.V(4).Infof("create Helm repo success, repo_id: %s", repo.GetHelmRepoId())
    }

    return &CreateRepoResponse{repo.GetHelmRepoId()}, nil
}
// Create takes the representation of a HelmRepo and creates it.  Returns the server's representation of the HelmRepo, and an error, if there is any.
func (c *HelmRepos) Create(ctx context.Context, HelmRepo *v1alpha1.HelmRepo, opts v1.CreateOptions) (result *v1alpha1.HelmRepo, err error) {
    result = &v1alpha1.HelmRepo{}
    err = c.client.Post().
        Resource("Helmrepos").
        VersionedParams(&opts, scheme.ParameterCodec).
        Body(HelmRepo).
        Do(ctx).
        Into(result)
    return
}

查詢Helm 倉庫應用模版代碼介紹

接口實現

  1. 路由註冊
  2. handler,參數解析,調用 models 方面
  3. models ,調用 models 方法
  4. crd client,調用 K8s api 存儲
webservice.Route(webservice.GET("/apps").LiHui, 6 months ago: • openpitrix crd
        Deprecate().
        To(handler.ListApps).  // 跟進
        Doc("List app templates").
        Param(webservice.QueryParameter(params.ConditionsParam, "query conditions,connect multiple conditions with commas, equal symbol for exact query, wave symbol for fuzzy query e.g. name~a").
            Required(false).
            DataFormat("key=%s,key~%s")).
        Param(webservice.QueryParameter(params.PagingParam, "paging query, e.g. limit=100,page=1").
            Required(false).
            DataFormat("limit=%d,page=%d").
            DefaultValue("limit=10,page=1")).
        Param(webservice.QueryParameter(params.ReverseParam, "sort parameters, e.g. reverse=true")).
        Param(webservice.QueryParameter(params.OrderByParam, "sort parameters, e.g. orderBy=createTime")).
        Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}).
        Returns(http.StatusOK, api.StatusOK, models.PageableResponse{}))
func (h *openpitrixHandler) ListApps(req *restful.Request, resp *restful.Response)
    limit, offset := params.ParsePaging(req)
    orderBy := params.GetStringValueWithDefault(req, params.OrderByParam, openpitrix.CreateTime)
    reverse := params.GetBoolValueWithDefault(req, params.ReverseParam, false)
    conditions, err := params.ParseConditions(req)

    if err != nil {
        klog.V(4).Infoln(err)
        api.HandleBadRequest(resp, nil, err)
        return
    }

    if req.PathParameter("workspace") != "" {
        conditions.Match[openpitrix.WorkspaceLabel] = req.PathParameter("workspace")
    }

    result, err := h.openpitrix.ListApps(conditions, orderBy, reverse, limit, offset)  // 跟進

    if err != nil {
        klog.Errorln(err)
        handleOpenpitrixError(resp, err)
        return
    }

    resp.WriteEntity(result)
}
func (c *applicationOperator) ListApps(conditions *params.Conditions, orderBy string, reverse bool, limit, offset int) (*models.PageableResponse, error) {

    apps, err := c.listApps(conditions)  // 重點跟進
    if err != nil {
        klog.Error(err)
        return nil, err
    }
    apps = filterApps(apps, conditions)

    if reverse {
        sort.Sort(sort.Reverse(HelmApplicationList(apps)))
    } else {
        sort.Sort(HelmApplicationList(apps))
    }

    totalCount := len(apps)
    start, end := (&query.Pagination{Limit: limit, Offset: offset}).GetValidPagination(totalCount)
    apps = apps[start:end]
    items := make([]interface{}, 0, len(apps))

    for i := range apps {
        versions, err := c.getAppVersionsByAppId(apps[i].GetHelmApplicationId())
        if err != nil && !apierrors.IsNotFound(err) {
            return nil, err
        }
        ctg, _ := c.ctgLister.Get(apps[i].GetHelmCategoryId())
        items = append(items, convertApp(apps[i], versions, ctg, 0))
    }
    return &models.PageableResponse{Items: items, TotalCount: totalCount}, nil
}

// line 601
func (c *applicationOperator) listApps(conditions *params.Conditions) (ret []*v1alpha1.HelmApplication, err error) {
    repoId := conditions.Match[RepoId]
    if repoId != "" && repoId != v1alpha1.AppStoreRepoId {
        // get Helm application from Helm repo
        if ret, exists := c.cachedRepos.ListApplicationsByRepoId(repoId); !exists {
            klog.Warningf("load repo failed, repo id: %s", repoId)
            return nil, loadRepoInfoFailed
        } else {
            return ret, nil
        }
    } else {
        if c.backingStoreClient == nil {
            return []*v1alpha1.HelmApplication{}, nil
        }
        ret, err = c.appLister.List(labels.SelectorFromSet(buildLabelSelector(conditions)))
    }

    return
}
func (c *cachedRepos) ListApplicationsByRepoId(repoId string) (ret []*v1alpha1.HelmApplication, exists bool) {
    c.RLock()
    defer c.RUnlock()

    if repo, exists := c.repos[repoId]; !exists {
        return nil, false
    } else {
        ret = make([]*v1alpha1.HelmApplication, 0, 10)
        for _, app := range c.apps {
            if app.GetHelmRepoId() == repo.Name { // 應用的倉庫ID相同則追加
                ret = append(ret, app)
            }
        }
    }
    return ret, true
}

既然 app template 是從緩存中獲取的,那麼緩存中的數據又是何時錄入的呢?web

  1. 建立全局緩存變量
  2. 添加新 Helm 倉庫,K8s 中已安裝 crd 控制器 HelmRepoController 發現有新的 HelmRepo 建立,更新 .Status.Data 內容
  3. informer 發現有更新,同時更新緩存

緩存更新的實現

  1. 建立全局變量,經過 init 函數初始化
  2. 經過 HelmRepo 的 informer 實現緩存同步更新
  3. 在每次調用接口的時候,hanlder 類中包換了緩存變量

建立接口類 openpitrix.Interfaceredis

type openpitrixHandler struct {
    openpitrix openpitrix.Interface
}

func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options) *openpitrixHandler {
    var s3Client s3.Interface
    if option != nil && option.S3Options != nil && len(option.S3Options.Endpoint) != 0 {
        var err error
        s3Client, err = s3.NewS3Client(option.S3Options)
        if err != nil {
            klog.Errorf("failed to connect to storage, please check storage service status, error: %v", err)
        }
    }

    return &openpitrixHandler{
        openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client),
    }
}

NewOpenpitrixOperatorsql

  • 經過在 informer 中添加通知函數,執行緩存更新
  • once.Do 只執行一次
var cachedReposData reposcache.ReposCache
var HelmReposInformer cache.SharedIndexInformer
var once sync.Once


func init() {
    cachedReposData = reposcache.NewReposCache() // 全局緩存
}

func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface {

    once.Do(func() {
        klog.Infof("start Helm repo informer")
        HelmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer()
        HelmReposInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                r := obj.(*v1alpha1.HelmRepo)
                cachedReposData.AddRepo(r) // 緩存更新, 點擊跟進
            },
            UpdateFunc: func(oldObj, newObj interface{}) {
                oldR := oldObj.(*v1alpha1.HelmRepo)
                cachedReposData.DeleteRepo(oldR)
                r := newObj.(*v1alpha1.HelmRepo)
                cachedReposData.AddRepo(r)
            },
            DeleteFunc: func(obj interface{}) {
                r := obj.(*v1alpha1.HelmRepo)
                cachedReposData.DeleteRepo(r)
            },
        })
        go HelmReposInformer.Run(wait.NeverStop)
    })

    return &openpitrixOperator{
        AttachmentInterface:  newAttachmentOperator(s3Client),
    // cachedReposData used
        ApplicationInterface: newApplicationOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient, s3Client),
    // cachedReposData used
        RepoInterface:        newRepoOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient),
    // cachedReposData used
        ReleaseInterface:     newReleaseOperator(cachedReposData, ksInformers.KubernetesSharedInformerFactory(), ksInformers.KubeSphereSharedInformerFactory(), ksClient),
        CategoryInterface:    newCategoryOperator(ksInformers.KubeSphereSharedInformerFactory(), ksClient),
    }
}

緩存更新邏輯json

// 緩存結構體
type cachedRepos struct {
    sync.RWMutex

    chartsInRepo  map[workspace]map[string]int
    repoCtgCounts map[string]map[string]int

    repos    map[string]*v1alpha1.HelmRepo
    apps     map[string]*v1alpha1.HelmApplication
    versions map[string]*v1alpha1.HelmApplicationVersion
}
  • ByteArrayToSavedIndex:將 repo.Status.Data 轉換爲 SavedIndex 數組對象
  • 遍歷 SavedIndex.Applications
  • 保存(app.ApplicationId:HelmApplication)到 cachedRepos.apps
func (c *cachedRepos) AddRepo(repo *v1alpha1.HelmRepo) error {
    return c.addRepo(repo, false)
}

//Add new Repo to cachedRepos
func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
    if len(repo.Status.Data) == 0 {
        return nil
    }
    index, err := Helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
    if err != nil {
        klog.Errorf("json unmarshal repo %s failed, error: %s", repo.Name, err)
        return err
    }
    ...

    chartsCount := 0
    for key, app := range index.Applications {
        if builtin {
            appName = v1alpha1.HelmApplicationIdPrefix + app.Name
        } else {
            appName = app.ApplicationId
        }

        HelmApp := v1alpha1.HelmApplication{
        ....
        }
        c.apps[app.ApplicationId] = &HelmApp

        var ctg, appVerName string
        var chartData []byte
        for _, ver := range app.Charts {
            chartsCount += 1
            if ver.Annotations != nil && ver.Annotations["category"] != "" {
                ctg = ver.Annotations["category"]
            }
            if builtin {
                appVerName = base64.StdEncoding.EncodeToString([]byte(ver.Name + ver.Version))
                chartData, err = loadBuiltinChartData(ver.Name, ver.Version)
                if err != nil {
                    return err
                }
            } else {
                appVerName = ver.ApplicationVersionId
            }

            version := &v1alpha1.HelmApplicationVersion{
            ....
            }
            c.versions[ver.ApplicationVersionId] = version
        }
        ....
    }
    return nil
}

HelmRepo 協調器

HelmRepo.Status.Data 加載流程

  1. LoadRepoIndex: convert index.yaml to IndexFile
  2. MergeRepoIndex: merge new and old IndexFile
  3. savedIndex.Bytes(): compress data with zlib.NewWriter
  4. 將 savedIndex 數據存入 CRD(HelmRepo.Status.Data)

關鍵結構體api

// HelmRepo.Status.Data == SavedIndex 壓縮後的數據
   type SavedIndex struct {
       APIVersion   string                  `json:"apiVersion"`
       Generated    time.Time               `json:"generated"`
       Applications map[string]*Application `json:"apps"`
       PublicKeys   []string                `json:"publicKeys,omitempty"`

       // Annotations are additional mappings uninterpreted by Helm. They are made available for
       // other applications to add information to the index file.
       Annotations map[string]string `json:"annotations,omitempty"`
   }

   // IndexFile represents the index file in a chart repository
   type IndexFile struct {
       APIVersion string                   `json:"apiVersion"`
       Generated  time.Time                `json:"generated"`
       Entries    map[string]ChartVersions `json:"entries"`
       PublicKeys []string                 `json:"publicKeys,omitempty"`
   }

代碼位置數組

func (r *ReconcileHelmRepo) syncRepo(instance *v1alpha1.HelmRepo) error {
       // 1. load index from Helm repo
       index, err := Helmrepoindex.LoadRepoIndex(context.TODO(), instance.Spec.Url, &instance.Spec.Credential)

       if err != nil {
           klog.Errorf("load index failed, repo: %s, url: %s, err: %s", instance.GetTrueName(), instance.Spec.Url, err)
           return err
       }

       existsSavedIndex := &Helmrepoindex.SavedIndex{}
       if len(instance.Status.Data) != 0 {
           existsSavedIndex, err = Helmrepoindex.ByteArrayToSavedIndex([]byte(instance.Status.Data))
           if err != nil {
               klog.Errorf("json unmarshal failed, repo: %s,  error: %s", instance.GetTrueName(), err)
               return err
           }
       }

       // 2. merge new index with old index which is stored in crd
       savedIndex := Helmrepoindex.MergeRepoIndex(index, existsSavedIndex)

       // 3. save index in crd
       data, err := savedIndex.Bytes()
       if err != nil {
           klog.Errorf("json marshal failed, error: %s", err)
           return err
       }

       instance.Status.Data = string(data)
       return nil
   }

Question:

Q1:Helm 倉庫發包時如何進行 Helm release 版本控制緩存

A:修改 Charts.yaml 中的字段 version,而後 Helm package, 等於新增一個 .tgz 包,老版本的不要刪除,這時候執行 index 的時候會吧全部的 .tgz 包包含在內。

$ Helm repo index stable --url=xxx.xx.xx.xxx:8081/
  $ cat index.yaml
  ....
  redis:
  - apiVersion: v1
    appVersion: "1.0"
    created: "2021-06-22T16:34:58.286583012+08:00"
    description: A Helm chart for Kubernetes
    digest: fd7c0d962155330527c0a512a74bea33302fca940b810c43ee5f461b1013dbf5
    name: redis
    urls:
    - xxx.xx.xx.xxx:8081/redis-0.1.1.tgz
    version: 0.1.1
  - apiVersion: v1
    appVersion: "1.0"
    created: "2021-06-22T16:34:58.286109049+08:00"
    description: A Helm chart for Kubernetes
    digest: 1a23bd6d5e45f9d323500bbe170011fb23bfccf2c1bd25814827eb8dc643d7f0
    name: redis
    urls:
    - xxx.xx.xx.xxx:8081/redis-0.1.0.tgz
    version: 0.1.0

Q2:KuberSphere 版本同步功能有缺失?用戶添加完 Helm 倉庫後,若是有新的應用發佈,查詢不到

A:解決方案:使用 3 種同步策略

  • 定時同步 Helm 倉庫(HelmRepo 設置一個定時協調的事件)
  • 企業倉庫,用戶能夠設置 hook,發佈新版本的時候主動觸發更新
  • 用戶主動更新 charts

Q3:index.yaml 緩存位置

A:某些倉庫的index.yaml 比較大,若是1000個用戶,1000個charts 會太吃內存。建議經常使用index.yaml的放在內存中,不經常使用的index.yaml存儲在本地磁盤。

本文由博客一文多發平臺 OpenWrite 發佈!

相關文章
相關標籤/搜索