做者:蔡錫生,LStack 平臺研發工程師,近期專一於基於 OAM 的應用託管平臺落地。mysql
背景介紹
KubeSphere 應用商店簡介
做爲一個開源的、以應用爲中心的容器平臺,KubeSphere 在 OpenPitrix 的基礎上,爲用戶提供了一個基於 Helm 的應用商店,用於應用生命週期管理。OpenPitrix 是一個開源的 Web 平臺,用於打包、部署和管理不一樣類型的應用。KubeSphere 應用商店讓 ISV、開發者和用戶可以在一站式服務中只需點擊幾下就能夠上傳、測試、部署和發佈應用。git
KubeSphere 中的 Helm 倉庫功能
- KubeSphere Helm 倉庫添加
- Helm repo list
- KubeSphere Helm 倉庫中的應用模版查詢
Helm 倉庫簡介
Helm charts 是存放 K8s 應用模版的倉庫,該倉庫由 index.yaml
文件和 .tgz
模版包組成。github
[root@ningbo stable]# ls -al 總用量 400 drwxr-xr-x. 26 root root 4096 6月 22 17:01 . drwxr-xr-x. 4 root root 86 6月 22 16:37 .. -rw-r--r--. 1 root root 10114 6月 22 17:12 index.yaml -rw-r--r--. 1 root root 3803 6月 8 2020 lsh-cluster-csm-om-agent-0.1.0.tgz -rw-r--r--. 1 root root 4022 6月 8 2020 lsh-mcp-cc-alert-service-0.1.0.tgz -rw-r--r--. 1 root root 4340 6月 8 2020 lsh-mcp-cc-sms-service-0.1.0.tgz -rw-r--r--. 1 root root 4103 6月 8 2020 lsh-mcp-cpm-metrics-exchange-0.1.0.tgz -rw-r--r--. 1 root root 4263 6月 8 2020 lsh-mcp-cpm-om-service-0.1.0.tgz -rw-r--r--. 1 root root 4155 6月 8 2020 lsh-mcp-csm-om-service-0.1.0.tgz -rw-r--r--. 1 root root 3541 6月 8 2020 lsh-mcp-deploy-service-0.1.0.tgz -rw-r--r--. 1 root root 5549 6月 8 2020 lsh-mcp-iam-apigateway-service-0.1.0.tgz
index.yaml
文件
apiVersion: v1 entries: aliyun-ccm: - apiVersion: v2 appVersion: addon created: "2021-06-21T08:59:58Z" description: A Helm chart for Kubernetes digest: 6bda563c86333475255e5edfedc200ae282544e2c6e22b519a59b3c7bdef9a32 name: aliyun-ccm type: application urls: - charts/aliyun-ccm-0.1.0.tgz version: 0.1.0 aliyun-csi-driver: - apiVersion: v2 appVersion: addon created: "2021-06-21T08:59:58Z" description: A Helm chart for Kubernetes digest: b49f128d7a49401d52173e6f58caedd3fabbe8e2827dc00e6a824ee38860fa51 name: aliyun-csi-driver type: application urls: - charts/aliyun-csi-driver-0.1.0.tgz version: 0.1.0 application-controller: - apiVersion: v1 appVersion: addon created: "2021-06-21T08:59:58Z" description: A Helm chart for application Controller digest: 546e72ce77f865683ce0ea75f6e0203537a40744f2eb34e36a5bd378f9452bc5 name: application-controller urls: - charts/application-controller-0.1.0.tgz version: 0.1.0
.tgz
解壓縮後的文件目錄
[root@ningbo stable]# cd mysql/ [root@ningbo mysql]# ls -al 總用量 20 drwxr-xr-x. 3 root root 97 5月 25 2020 . drwxr-xr-x. 26 root root 4096 6月 22 17:01 .. -rwxr-xr-x. 1 root root 106 5月 25 2020 Chart.yaml -rwxr-xr-x. 1 root root 364 5月 25 2020 .Helmignore -rwxr-xr-x. 1 root root 76 5月 25 2020 index.yaml drwxr-xr-x. 3 root root 146 5月 25 2020 templates -rwxr-xr-x. 1 root root 1735 5月 25 2020 values.yaml
Chart.yaml
[root@ningbo mysql]# cat Chart.yaml apiVersion: v1 appVersion: "1.0" description: A Helm chart for Kubernetes name: mysql version: 0.1.0
添加 Helm 倉庫代碼介紹
接口實現分析
- 路由註冊
- handler,參數解析,調用 models 方面
- models ,調用 models 方法
- crd client,調用 K8s api 存儲
webservice.Route(webservice.POST("/repos"). To(handler.CreateRepo). // 跟進 Doc("Create a global repository, which is used to store package of app"). Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}). Param(webservice.QueryParameter("validate", "Validate repository")). Returns(http.StatusOK, api.StatusOK, openpitrix.CreateRepoResponse{}). Reads(openpitrix.CreateRepoRequest{}))
func (h *openpitrixHandler) CreateRepo(req *restful.Request, resp *restful.Response) { createRepoRequest := &openpitrix.CreateRepoRequest{} err := req.ReadEntity(createRepoRequest) if err != nil { klog.V(4).Infoln(err) api.HandleBadRequest(resp, nil, err) return } createRepoRequest.Workspace = new(string) *createRepoRequest.Workspace = req.PathParameter("workspace") user, _ := request.UserFrom(req.Request.Context()) creator := "" if user != nil { creator = user.GetName() } parsedUrl, err := url.Parse(createRepoRequest.URL) if err != nil { api.HandleBadRequest(resp, nil, err) return } userInfo := parsedUrl.User // trim credential from url parsedUrl.User = nil repo := v1alpha1.HelmRepo{ ObjectMeta: metav1.ObjectMeta{ Name: idutils.GetUuid36(v1alpha1.HelmRepoIdPrefix), Annotations: map[string]string{ constants.CreatorAnnotationKey: creator, }, Labels: map[string]string{ constants.WorkspaceLabelKey: *createRepoRequest.Workspace, }, }, Spec: v1alpha1.HelmRepoSpec{ Name: createRepoRequest.Name, Url: parsedUrl.String(), SyncPeriod: 0, Description: stringutils.ShortenString(createRepoRequest.Description, 512), }, } if strings.HasPrefix(createRepoRequest.URL, "https://") || strings.HasPrefix(createRepoRequest.URL, "http://") { if userInfo != nil { repo.Spec.Credential.Username = userInfo.Username() repo.Spec.Credential.Password, _ = userInfo.Password() } } else if strings.HasPrefix(createRepoRequest.URL, "s3://") { cfg := v1alpha1.S3Config{} err := json.Unmarshal([]byte(createRepoRequest.Credential), &cfg) if err != nil { api.HandleBadRequest(resp, nil, err) return } repo.Spec.Credential.S3Config = cfg } var result interface{} // 1. validate repo result, err = h.openpitrix.ValidateRepo(createRepoRequest.URL, &repo.Spec.Credential) if err != nil { klog.Errorf("validate repo failed, err: %s", err) api.HandleBadRequest(resp, nil, err) return } // 2. create repo validate, _ := strconv.ParseBool(req.QueryParameter("validate")) if !validate { if repo.GetTrueName() == "" { api.HandleBadRequest(resp, nil, fmt.Errorf("repo name is empty")) return } result, err = h.openpitrix.CreateRepo(&repo) //跟進 } if err != nil { klog.Errorln(err) handleOpenpitrixError(resp, err) return } resp.WriteEntity(result) }
func (c *repoOperator) CreateRepo(repo *v1alpha1.HelmRepo) (*CreateRepoResponse, error) { name := repo.GetTrueName() items, err := c.repoLister.List(labels.SelectorFromSet(map[string]string{constants.WorkspaceLabelKey: repo.GetWorkspace()})) if err != nil && !apierrors.IsNotFound(err) { klog.Errorf("list Helm repo failed: %s", err) return nil, err } for _, exists := range items { if exists.GetTrueName() == name { klog.Error(repoItemExists, "name: ", name) return nil, repoItemExists } } repo.Spec.Description = stringutils.ShortenString(repo.Spec.Description, DescriptionLen) _, err = c.repoClient.HelmRepos().Create(context.TODO(), repo, metav1.CreateOptions{}) // 跟進 if err != nil { klog.Errorf("create Helm repo failed, repo_id: %s, error: %s", repo.GetHelmRepoId(), err) return nil, err } else { klog.V(4).Infof("create Helm repo success, repo_id: %s", repo.GetHelmRepoId()) } return &CreateRepoResponse{repo.GetHelmRepoId()}, nil }
// Create takes the representation of a HelmRepo and creates it. Returns the server's representation of the HelmRepo, and an error, if there is any. func (c *HelmRepos) Create(ctx context.Context, HelmRepo *v1alpha1.HelmRepo, opts v1.CreateOptions) (result *v1alpha1.HelmRepo, err error) { result = &v1alpha1.HelmRepo{} err = c.client.Post(). Resource("Helmrepos"). VersionedParams(&opts, scheme.ParameterCodec). Body(HelmRepo). Do(ctx). Into(result) return }
查詢Helm 倉庫應用模版代碼介紹
接口實現
- 路由註冊
- handler,參數解析,調用 models 方面
- models ,調用 models 方法
- crd client,調用 K8s api 存儲
webservice.Route(webservice.GET("/apps").LiHui, 6 months ago: • openpitrix crd Deprecate(). To(handler.ListApps). // 跟進 Doc("List app templates"). Param(webservice.QueryParameter(params.ConditionsParam, "query conditions,connect multiple conditions with commas, equal symbol for exact query, wave symbol for fuzzy query e.g. name~a"). Required(false). DataFormat("key=%s,key~%s")). Param(webservice.QueryParameter(params.PagingParam, "paging query, e.g. limit=100,page=1"). Required(false). DataFormat("limit=%d,page=%d"). DefaultValue("limit=10,page=1")). Param(webservice.QueryParameter(params.ReverseParam, "sort parameters, e.g. reverse=true")). Param(webservice.QueryParameter(params.OrderByParam, "sort parameters, e.g. orderBy=createTime")). Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}). Returns(http.StatusOK, api.StatusOK, models.PageableResponse{}))
func (h *openpitrixHandler) ListApps(req *restful.Request, resp *restful.Response) limit, offset := params.ParsePaging(req) orderBy := params.GetStringValueWithDefault(req, params.OrderByParam, openpitrix.CreateTime) reverse := params.GetBoolValueWithDefault(req, params.ReverseParam, false) conditions, err := params.ParseConditions(req) if err != nil { klog.V(4).Infoln(err) api.HandleBadRequest(resp, nil, err) return } if req.PathParameter("workspace") != "" { conditions.Match[openpitrix.WorkspaceLabel] = req.PathParameter("workspace") } result, err := h.openpitrix.ListApps(conditions, orderBy, reverse, limit, offset) // 跟進 if err != nil { klog.Errorln(err) handleOpenpitrixError(resp, err) return } resp.WriteEntity(result) }
func (c *applicationOperator) ListApps(conditions *params.Conditions, orderBy string, reverse bool, limit, offset int) (*models.PageableResponse, error) { apps, err := c.listApps(conditions) // 重點跟進 if err != nil { klog.Error(err) return nil, err } apps = filterApps(apps, conditions) if reverse { sort.Sort(sort.Reverse(HelmApplicationList(apps))) } else { sort.Sort(HelmApplicationList(apps)) } totalCount := len(apps) start, end := (&query.Pagination{Limit: limit, Offset: offset}).GetValidPagination(totalCount) apps = apps[start:end] items := make([]interface{}, 0, len(apps)) for i := range apps { versions, err := c.getAppVersionsByAppId(apps[i].GetHelmApplicationId()) if err != nil && !apierrors.IsNotFound(err) { return nil, err } ctg, _ := c.ctgLister.Get(apps[i].GetHelmCategoryId()) items = append(items, convertApp(apps[i], versions, ctg, 0)) } return &models.PageableResponse{Items: items, TotalCount: totalCount}, nil } // line 601 func (c *applicationOperator) listApps(conditions *params.Conditions) (ret []*v1alpha1.HelmApplication, err error) { repoId := conditions.Match[RepoId] if repoId != "" && repoId != v1alpha1.AppStoreRepoId { // get Helm application from Helm repo if ret, exists := c.cachedRepos.ListApplicationsByRepoId(repoId); !exists { klog.Warningf("load repo failed, repo id: %s", repoId) return nil, loadRepoInfoFailed } else { return ret, nil } } else { if c.backingStoreClient == nil { return []*v1alpha1.HelmApplication{}, nil } ret, err = c.appLister.List(labels.SelectorFromSet(buildLabelSelector(conditions))) } return }
func (c *cachedRepos) ListApplicationsByRepoId(repoId string) (ret []*v1alpha1.HelmApplication, exists bool) { c.RLock() defer c.RUnlock() if repo, exists := c.repos[repoId]; !exists { return nil, false } else { ret = make([]*v1alpha1.HelmApplication, 0, 10) for _, app := range c.apps { if app.GetHelmRepoId() == repo.Name { // 應用的倉庫ID相同則追加 ret = append(ret, app) } } } return ret, true }
既然 app template 是從緩存中獲取的,那麼緩存中的數據又是何時錄入的呢?web
- 建立全局緩存變量
- 添加新 Helm 倉庫,K8s 中已安裝 crd 控制器
HelmRepoController
發現有新的 HelmRepo 建立,更新.Status.Data
內容- informer 發現有更新,同時更新緩存
緩存更新的實現
- 建立全局變量,經過 init 函數初始化
- 經過 HelmRepo 的 informer 實現緩存同步更新
- 在每次調用接口的時候,hanlder 類中包換了緩存變量
建立接口類 openpitrix.Interface
redis
type openpitrixHandler struct { openpitrix openpitrix.Interface } func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options) *openpitrixHandler { var s3Client s3.Interface if option != nil && option.S3Options != nil && len(option.S3Options.Endpoint) != 0 { var err error s3Client, err = s3.NewS3Client(option.S3Options) if err != nil { klog.Errorf("failed to connect to storage, please check storage service status, error: %v", err) } } return &openpitrixHandler{ openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client), } }
- 經過在 informer 中添加通知函數,執行緩存更新
- once.Do 只執行一次
var cachedReposData reposcache.ReposCache var HelmReposInformer cache.SharedIndexInformer var once sync.Once func init() { cachedReposData = reposcache.NewReposCache() // 全局緩存 } func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface { once.Do(func() { klog.Infof("start Helm repo informer") HelmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer() HelmReposInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { r := obj.(*v1alpha1.HelmRepo) cachedReposData.AddRepo(r) // 緩存更新, 點擊跟進 }, UpdateFunc: func(oldObj, newObj interface{}) { oldR := oldObj.(*v1alpha1.HelmRepo) cachedReposData.DeleteRepo(oldR) r := newObj.(*v1alpha1.HelmRepo) cachedReposData.AddRepo(r) }, DeleteFunc: func(obj interface{}) { r := obj.(*v1alpha1.HelmRepo) cachedReposData.DeleteRepo(r) }, }) go HelmReposInformer.Run(wait.NeverStop) }) return &openpitrixOperator{ AttachmentInterface: newAttachmentOperator(s3Client), // cachedReposData used ApplicationInterface: newApplicationOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient, s3Client), // cachedReposData used RepoInterface: newRepoOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient), // cachedReposData used ReleaseInterface: newReleaseOperator(cachedReposData, ksInformers.KubernetesSharedInformerFactory(), ksInformers.KubeSphereSharedInformerFactory(), ksClient), CategoryInterface: newCategoryOperator(ksInformers.KubeSphereSharedInformerFactory(), ksClient), } }
緩存更新邏輯json
// 緩存結構體 type cachedRepos struct { sync.RWMutex chartsInRepo map[workspace]map[string]int repoCtgCounts map[string]map[string]int repos map[string]*v1alpha1.HelmRepo apps map[string]*v1alpha1.HelmApplication versions map[string]*v1alpha1.HelmApplicationVersion }
- ByteArrayToSavedIndex:將
repo.Status.Data
轉換爲 SavedIndex 數組對象 - 遍歷 SavedIndex.Applications
- 保存(app.ApplicationId:HelmApplication)到 cachedRepos.apps
func (c *cachedRepos) AddRepo(repo *v1alpha1.HelmRepo) error { return c.addRepo(repo, false) } //Add new Repo to cachedRepos func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error { if len(repo.Status.Data) == 0 { return nil } index, err := Helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data)) if err != nil { klog.Errorf("json unmarshal repo %s failed, error: %s", repo.Name, err) return err } ... chartsCount := 0 for key, app := range index.Applications { if builtin { appName = v1alpha1.HelmApplicationIdPrefix + app.Name } else { appName = app.ApplicationId } HelmApp := v1alpha1.HelmApplication{ .... } c.apps[app.ApplicationId] = &HelmApp var ctg, appVerName string var chartData []byte for _, ver := range app.Charts { chartsCount += 1 if ver.Annotations != nil && ver.Annotations["category"] != "" { ctg = ver.Annotations["category"] } if builtin { appVerName = base64.StdEncoding.EncodeToString([]byte(ver.Name + ver.Version)) chartData, err = loadBuiltinChartData(ver.Name, ver.Version) if err != nil { return err } } else { appVerName = ver.ApplicationVersionId } version := &v1alpha1.HelmApplicationVersion{ .... } c.versions[ver.ApplicationVersionId] = version } .... } return nil }
HelmRepo 協調器
HelmRepo.Status.Data 加載流程
- LoadRepoIndex: convert index.yaml to IndexFile
- MergeRepoIndex: merge new and old IndexFile
- savedIndex.Bytes(): compress data with zlib.NewWriter
- 將 savedIndex 數據存入 CRD(HelmRepo.Status.Data)
關鍵結構體api
// HelmRepo.Status.Data == SavedIndex 壓縮後的數據 type SavedIndex struct { APIVersion string `json:"apiVersion"` Generated time.Time `json:"generated"` Applications map[string]*Application `json:"apps"` PublicKeys []string `json:"publicKeys,omitempty"` // Annotations are additional mappings uninterpreted by Helm. They are made available for // other applications to add information to the index file. Annotations map[string]string `json:"annotations,omitempty"` } // IndexFile represents the index file in a chart repository type IndexFile struct { APIVersion string `json:"apiVersion"` Generated time.Time `json:"generated"` Entries map[string]ChartVersions `json:"entries"` PublicKeys []string `json:"publicKeys,omitempty"` }
代碼位置數組
func (r *ReconcileHelmRepo) syncRepo(instance *v1alpha1.HelmRepo) error { // 1. load index from Helm repo index, err := Helmrepoindex.LoadRepoIndex(context.TODO(), instance.Spec.Url, &instance.Spec.Credential) if err != nil { klog.Errorf("load index failed, repo: %s, url: %s, err: %s", instance.GetTrueName(), instance.Spec.Url, err) return err } existsSavedIndex := &Helmrepoindex.SavedIndex{} if len(instance.Status.Data) != 0 { existsSavedIndex, err = Helmrepoindex.ByteArrayToSavedIndex([]byte(instance.Status.Data)) if err != nil { klog.Errorf("json unmarshal failed, repo: %s, error: %s", instance.GetTrueName(), err) return err } } // 2. merge new index with old index which is stored in crd savedIndex := Helmrepoindex.MergeRepoIndex(index, existsSavedIndex) // 3. save index in crd data, err := savedIndex.Bytes() if err != nil { klog.Errorf("json marshal failed, error: %s", err) return err } instance.Status.Data = string(data) return nil }
Question:
Q1:Helm 倉庫發包時如何進行 Helm release 版本控制緩存
A:修改 Charts.yaml
中的字段 version,而後 Helm package, 等於新增一個 .tgz
包,老版本的不要刪除,這時候執行 index 的時候會吧全部的 .tgz
包包含在內。
$ Helm repo index stable --url=xxx.xx.xx.xxx:8081/ $ cat index.yaml .... redis: - apiVersion: v1 appVersion: "1.0" created: "2021-06-22T16:34:58.286583012+08:00" description: A Helm chart for Kubernetes digest: fd7c0d962155330527c0a512a74bea33302fca940b810c43ee5f461b1013dbf5 name: redis urls: - xxx.xx.xx.xxx:8081/redis-0.1.1.tgz version: 0.1.1 - apiVersion: v1 appVersion: "1.0" created: "2021-06-22T16:34:58.286109049+08:00" description: A Helm chart for Kubernetes digest: 1a23bd6d5e45f9d323500bbe170011fb23bfccf2c1bd25814827eb8dc643d7f0 name: redis urls: - xxx.xx.xx.xxx:8081/redis-0.1.0.tgz version: 0.1.0
Q2:KuberSphere 版本同步功能有缺失?用戶添加完 Helm 倉庫後,若是有新的應用發佈,查詢不到
A:解決方案:使用 3 種同步策略
- 定時同步 Helm 倉庫(HelmRepo 設置一個定時協調的事件)
- 企業倉庫,用戶能夠設置 hook,發佈新版本的時候主動觸發更新
- 用戶主動更新 charts
Q3:index.yaml 緩存位置
A:某些倉庫的index.yaml 比較大,若是1000個用戶,1000個charts 會太吃內存。建議經常使用index.yaml的放在內存中,不經常使用的index.yaml存儲在本地磁盤。
本文由博客一文多發平臺 OpenWrite 發佈!