作者:蔡锡生,LStack 平台研发工程师,近期专注于基于 OAM 的应用托管平台落地。
背景介绍
KubeSphere 应用商店简介
作为一个开源的、以应用为中心的容器平台,KubeSphere 在 OpenPitrix 的基础上,为用户提供了一个基于 Helm 的应用商店,用于应用生命周期管理。OpenPitrix 是一个开源的 Web 平台,用于打包、部署和管理不同类型的应用。KubeSphere 应用商店让 ISV、开发者和用户能够在一站式服务中只需点击几下就可以上传、测试、部署和发布应用。
KubeSphere 中的 Helm 仓库功能
默认情况下,应用商店中内置了 16 个应用,但您可以通过应用模板添加更多应用。
KubeSphere Helm 仓库添加
Helm repo list
KubeSphere Helm 仓库中的应用模版查询
Helm 仓库简介
Helm charts 是存放 K8s 应用模版的仓库,该仓库由 index.yaml 文件和 .tgz 模版包组成。
[root@ningbo stable]# ls -al
总用量 400
drwxr-xr-x. 26 root root 4096 6月 22 17:01 .
drwxr-xr-x. 4 root root 86 6月 22 16:37 ..
-rw-r--r--. 1 root root 10114 6月 22 17:12 index.yaml
-rw-r--r--. 1 root root 3803 6月 8 2020 lsh-cluster-csm-om-agent-0.1.0.tgz
-rw-r--r--. 1 root root 4022 6月 8 2020 lsh-mcp-cc-alert-service-0.1.0.tgz
-rw-r--r--. 1 root root 4340 6月 8 2020 lsh-mcp-cc-sms-service-0.1.0.tgz
-rw-r--r--. 1 root root 4103 6月 8 2020 lsh-mcp-cpm-metrics-exchange-0.1.0.tgz
-rw-r--r--. 1 root root 4263 6月 8 2020 lsh-mcp-cpm-om-service-0.1.0.tgz
-rw-r--r--. 1 root root 4155 6月 8 2020 lsh-mcp-csm-om-service-0.1.0.tgz
-rw-r--r--. 1 root root 3541 6月 8 2020 lsh-mcp-deploy-service-0.1.0.tgz
-rw-r--r--. 1 root root 5549 6月 8 2020 lsh-mcp-iam-apigateway-service-0.1.0.tgz
index.yaml 文件
apiVersion: v1
entries:
aliyun-ccm:
- apiVersion: v2
appVersion: addon
created: "2021-06-21T08:59:58Z"
description: A Helm chart for Kubernetes
digest: 6bda563c86333475255e5edfedc200ae282544e2c6e22b519a59b3c7bdef9a32
name: aliyun-ccm
type: application
urls:
- charts/aliyun-ccm-0.1.0.tgz
version: 0.1.0
aliyun-csi-driver:
- apiVersion: v2
appVersion: addon
created: "2021-06-21T08:59:58Z"
description: A Helm chart for Kubernetes
digest: b49f128d7a49401d52173e6f58caedd3fabbe8e2827dc00e6a824ee38860fa51
name: aliyun-csi-driver
type: application
urls:
- charts/aliyun-csi-driver-0.1.0.tgz
version: 0.1.0
application-controller:
- apiVersion: v1
appVersion: addon
created: "2021-06-21T08:59:58Z"
description: A Helm chart for application Controller
digest: 546e72ce77f865683ce0ea75f6e0203537a40744f2eb34e36a5bd378f9452bc5
name: application-controller
urls:
- charts/application-controller-0.1.0.tgz
version: 0.1.0
.tgz 解压缩后的文件目录
[root@ningbo stable]# cd mysql/
[root@ningbo mysql]# ls -al
总用量 20
drwxr-xr-x. 3 root root 97 5月 25 2020 .
drwxr-xr-x. 26 root root 4096 6月 22 17:01 ..
-rwxr-xr-x. 1 root root 106 5月 25 2020 Chart.yaml
-rwxr-xr-x. 1 root root 364 5月 25 2020 .Helmignore
-rwxr-xr-x. 1 root root 76 5月 25 2020 index.yaml
drwxr-xr-x. 3 root root 146 5月 25 2020 templates
-rwxr-xr-x. 1 root root 1735 5月 25 2020 values.yaml
Chart.yaml
[root@ningbo mysql]# cat Chart.yaml
apiVersion: v1
appVersion: "1.0"
description: A Helm chart for Kubernetes
name: mysql
version: 0.1.0
添加 Helm 仓库代码介绍
接口实现分析
路由注册
handler,参数解析,调用 models 方面
models ,调用 models 方法
crd client,调用 K8s api 存储
路由注册
webservice.Route(webservice.POST("/repos").
To(handler.CreateRepo). // 跟进
Doc("Create a global repository, which is used to store package of app").
Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}).
Param(webservice.QueryParameter("validate", "Validate repository")).
Returns(http.StatusOK, api.StatusOK, openpitrix.CreateRepoResponse{}).
Reads(openpitrix.CreateRepoRequest{}))
校验参数, 构建 models
func (h *openpitrixHandler) CreateRepo(req *restful.Request, resp *restful.Response) {
createRepoRequest := &openpitrix.CreateRepoRequest{}
err := req.ReadEntity(createRepoRequest)
if err != nil {
klog.V(4).Infoln(err)
api.HandleBadRequest(resp, nil, err)
return
}
createRepoRequest.Workspace = new(string)
*createRepoRequest.Workspace = req.PathParameter("workspace")
user, _ := request.UserFrom(req.Request.Context())
creator := ""
if user != nil {
creator = user.GetName()
}
parsedUrl, err := url.Parse(createRepoRequest.URL)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
userInfo := parsedUrl.User
// trim credential from url
parsedUrl.User = nil
repo := v1alpha1.HelmRepo{
ObjectMeta: metav1.ObjectMeta{
Name: idutils.GetUuid36(v1alpha1.HelmRepoIdPrefix),
Annotations: map[string]string{
constants.CreatorAnnotationKey: creator,
},
Labels: map[string]string{
constants.WorkspaceLabelKey: *createRepoRequest.Workspace,
},
},
Spec: v1alpha1.HelmRepoSpec{
Name: createRepoRequest.Name,
Url: parsedUrl.String(),
SyncPeriod: 0,
Description: stringutils.ShortenString(createRepoRequest.Description, 512),
},
}
if strings.HasPrefix(createRepoRequest.URL, "https://") || strings.HasPrefix(createRepoRequest.URL, "http://") {
if userInfo != nil {
repo.Spec.Credential.Username = userInfo.Username()
repo.Spec.Credential.Password, _ = userInfo.Password()
}
} else if strings.HasPrefix(createRepoRequest.URL, "s3://") {
cfg := v1alpha1.S3Config{}
err := json.Unmarshal([]byte(createRepoRequest.Credential), &cfg)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
repo.Spec.Credential.S3Config = cfg
}
var result interface{}
// 1. validate repo
result, err = h.openpitrix.ValidateRepo(createRepoRequest.URL, &repo.Spec.Credential)
if err != nil {
klog.Errorf("validate repo failed, err: %s", err)
api.HandleBadRequest(resp, nil, err)
return
}
// 2. create repo
validate, _ := strconv.ParseBool(req.QueryParameter("validate"))
if !validate {
if repo.GetTrueName() == "" {
api.HandleBadRequest(resp, nil, fmt.Errorf("repo name is empty"))
return
}
result, err = h.openpitrix.CreateRepo(&repo) //跟进
}
if err != nil {
klog.Errorln(err)
handleOpenpitrixError(resp, err)
return
}
resp.WriteEntity(result)
}
调用 createRep 方法
func (c *repoOperator) CreateRepo(repo *v1alpha1.HelmRepo) (*CreateRepoResponse, error) {
name := repo.GetTrueName()
items, err := c.repoLister.List(labels.SelectorFromSet(map[string]string{constants.WorkspaceLabelKey: repo.GetWorkspace()}))
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("list Helm repo failed: %s", err)
return nil, err
}
for _, exists := range items {
if exists.GetTrueName() == name {
klog.Error(repoItemExists, "name: ", name)
return nil, repoItemExists
}
}
repo.Spec.Description = stringutils.ShortenString(repo.Spec.Description, DescriptionLen)
_, err = c.repoClient.HelmRepos().Create(context.TODO(), repo, metav1.CreateOptions{}) // 跟进
if err != nil {
klog.Errorf("create Helm repo failed, repo_id: %s, error: %s", repo.GetHelmRepoId(), err)
return nil, err
} else {
klog.V(4).Infof("create Helm repo success, repo_id: %s", repo.GetHelmRepoId())
}
return &CreateRepoResponse{repo.GetHelmRepoId()}, nil
}
调用 K8s api, 创建 crd HelmRepo
// Create takes the representation of a HelmRepo and creates it. Returns the server's representation of the HelmRepo, and an error, if there is any.
func (c *HelmRepos) Create(ctx context.Context, HelmRepo *v1alpha1.HelmRepo, opts v1.CreateOptions) (result *v1alpha1.HelmRepo, err error) {
result = &v1alpha1.HelmRepo{}
err = c.client.Post().
Resource("Helmrepos").
VersionedParams(&opts, scheme.ParameterCodec).
Body(HelmRepo).
Do(ctx).
Into(result)
return
}
查询Helm 仓库应用模版代码介绍
接口实现
路由注册
handler,参数解析,调用 models 方面
models ,调用 models 方法
crd client,调用 K8s api 存储
路由注册
webservice.Route(webservice.GET("/apps").LiHui, 6 months ago: • openpitrix crd
Deprecate().
To(handler.ListApps). // 跟进
Doc("List app templates").
Param(webservice.QueryParameter(params.ConditionsParam, "query conditions,connect multiple conditions with commas, equal symbol for exact query, wave symbol for fuzzy query e.g. name~a").
Required(false).
DataFormat("key=%s,key~%s")).
Param(webservice.QueryParameter(params.PagingParam, "paging query, e.g. limit=100,page=1").
Required(false).
DataFormat("limit=%d,page=%d").
DefaultValue("limit=10,page=1")).
Param(webservice.QueryParameter(params.ReverseParam, "sort parameters, e.g. reverse=true")).
Param(webservice.QueryParameter(params.OrderByParam, "sort parameters, e.g. orderBy=createTime")).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}).
Returns(http.StatusOK, api.StatusOK, models.PageableResponse{}))
参数解析,调用 models 方面
func (h *openpitrixHandler) ListApps(req *restful.Request, resp *restful.Response)
limit, offset := params.ParsePaging(req)
orderBy := params.GetStringValueWithDefault(req, params.OrderByParam, openpitrix.CreateTime)
reverse := params.GetBoolValueWithDefault(req, params.ReverseParam, false)
conditions, err := params.ParseConditions(req)
if err != nil {
klog.V(4).Infoln(err)
api.HandleBadRequest(resp, nil, err)
return
}
if req.PathParameter("workspace") != "" {
conditions.Match[openpitrix.WorkspaceLabel] = req.PathParameter("workspace")
}
result, err := h.openpitrix.ListApps(conditions, orderBy, reverse, limit, offset) // 跟进
if err != nil {
klog.Errorln(err)
handleOpenpitrixError(resp, err)
return
}
resp.WriteEntity(result)
}
从缓存中获取 applist
func (c *applicationOperator) ListApps(conditions *params.Conditions, orderBy string, reverse bool, limit, offset int) (*models.PageableResponse, error) {
apps, err := c.listApps(conditions) // 重点跟进
if err != nil {
klog.Error(err)
return nil, err
}
apps = filterApps(apps, conditions)
if reverse {
sort.Sort(sort.Reverse(HelmApplicationList(apps)))
} else {
sort.Sort(HelmApplicationList(apps))
}
totalCount := len(apps)
start, end := (&query.Pagination{Limit: limit, Offset: offset}).GetValidPagination(totalCount)
apps = apps[start:end]
items := make([]interface{}, 0, len(apps))
for i := range apps {
versions, err := c.getAppVersionsByAppId(apps[i].GetHelmApplicationId())
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
ctg, _ := c.ctgLister.Get(apps[i].GetHelmCategoryId())
items = append(items, convertApp(apps[i], versions, ctg, 0))
}
return &models.PageableResponse{Items: items, TotalCount: totalCount}, nil
}
// line 601
func (c *applicationOperator) listApps(conditions *params.Conditions) (ret []*v1alpha1.HelmApplication, err error) {
repoId := conditions.Match[RepoId]
if repoId != "" && repoId != v1alpha1.AppStoreRepoId {
// get Helm application from Helm repo
if ret, exists := c.cachedRepos.ListApplicationsByRepoId(repoId); !exists {
klog.Warningf("load repo failed, repo id: %s", repoId)
return nil, loadRepoInfoFailed
} else {
return ret, nil
}
} else {
if c.backingStoreClient == nil {
return []*v1alpha1.HelmApplication{}, nil
}
ret, err = c.appLister.List(labels.SelectorFromSet(buildLabelSelector(conditions)))
}
return
}
缓存具体获取应用逻辑
func (c *cachedRepos) ListApplicationsByRepoId(repoId string) (ret []*v1alpha1.HelmApplication, exists bool) {
c.RLock()
defer c.RUnlock()
if repo, exists := c.repos[repoId]; !exists {
return nil, false
} else {
ret = make([]*v1alpha1.HelmApplication, 0, 10)
for _, app := range c.apps {
if app.GetHelmRepoId() == repo.Name { // 应用的仓库ID相同则追加
ret = append(ret, app)
}
}
}
return ret, true
}
既然 app template 是从缓存中获取的,那么缓存中的数据又是什么时候录入的呢?
创建全局缓存变量
添加新 Helm 仓库,K8s 中已安装 crd 控制器 HelmRepoController 发现有新的 HelmRepo 创建,更新 .Status.Data 内容
informer 发现有更新,同时更新缓存
缓存更新的实现
创建全局变量,通过 init 函数初始化
通过 HelmRepo 的 informer 实现缓存同步更新
在每次调用接口的时候,hanlder 类中包换了缓存变量
创建接口类 openpitrix.Interface
type openpitrixHandler struct {
openpitrix openpitrix.Interface
}
func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options) *openpitrixHandler {
var s3Client s3.Interface
if option != nil && option.S3Options != nil && len(option.S3Options.Endpoint) != 0 {
var err error
s3Client, err = s3.NewS3Client(option.S3Options)
if err != nil {
klog.Errorf("failed to connect to storage, please check storage service status, error: %v", err)
}
}
return &openpitrixHandler{
openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client),
}
}
NewOpenpitrixOperator
通过在 informer 中添加通知函数,执行缓存更新
once.Do 只执行一次
var cachedReposData reposcache.ReposCache
var HelmReposInformer cache.SharedIndexInformer
var once sync.Once
func init() {
cachedReposData = reposcache.NewReposCache() // 全局缓存
}
func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface {
once.Do(func() {
klog.Infof("start Helm repo informer")
HelmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer()
HelmReposInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
r := obj.(*v1alpha1.HelmRepo)
cachedReposData.AddRepo(r) // 缓存更新, 点击跟进
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldR := oldObj.(*v1alpha1.HelmRepo)
cachedReposData.DeleteRepo(oldR)
r := newObj.(*v1alpha1.HelmRepo)
cachedReposData.AddRepo(r)
},
DeleteFunc: func(obj interface{}) {
r := obj.(*v1alpha1.HelmRepo)
cachedReposData.DeleteRepo(r)
},
})
go HelmReposInformer.Run(wait.NeverStop)
})
return &openpitrixOperator{
AttachmentInterface: newAttachmentOperator(s3Client),
// cachedReposData used
ApplicationInterface: newApplicationOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient, s3Client),
// cachedReposData used
RepoInterface: newRepoOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient),
// cachedReposData used
ReleaseInterface: newReleaseOperator(cachedReposData, ksInformers.KubernetesSharedInformerFactory(), ksInformers.KubeSphereSharedInformerFactory(), ksClient),
CategoryInterface: newCategoryOperator(ksInformers.KubeSphereSharedInformerFactory(), ksClient),
}
}
缓存更新逻辑
// 缓存结构体
type cachedRepos struct {
sync.RWMutex
chartsInRepo map[workspace]map[string]int
repoCtgCounts map[string]map[string]int
repos map[string]*v1alpha1.HelmRepo
apps map[string]*v1alpha1.HelmApplication
versions map[string]*v1alpha1.HelmApplicationVersion
}
ByteArrayToSavedIndex:将 repo.Status.Data 转换为 SavedIndex 数组对象
遍历 SavedIndex.Applications
保存(app.ApplicationId:HelmApplication)到 cachedRepos.apps
func (c *cachedRepos) AddRepo(repo *v1alpha1.HelmRepo) error {
return c.addRepo(repo, false)
}
//Add new Repo to cachedRepos
func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
if len(repo.Status.Data) == 0 {
return nil
}
index, err := Helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
if err != nil {
klog.Errorf("json unmarshal repo %s failed, error: %s", repo.Name, err)
return err
}
...
chartsCount := 0
for key, app := range index.Applications {
if builtin {
appName = v1alpha1.HelmApplicationIdPrefix + app.Name
} else {
appName = app.ApplicationId
}
HelmApp := v1alpha1.HelmApplication{
....
}
c.apps[app.ApplicationId] = &HelmApp
var ctg, appVerName string
var chartData []byte
for _, ver := range app.Charts {
chartsCount += 1
if ver.Annotations != nil && ver.Annotations["category"] != "" {
ctg = ver.Annotations["category"]
}
if builtin {
appVerName = base64.StdEncoding.EncodeToString([]byte(ver.Name + ver.Version))
chartData, err = loadBuiltinChartData(ver.Name, ver.Version)
if err != nil {
return err
}
} else {
appVerName = ver.ApplicationVersionId
}
version := &v1alpha1.HelmApplicationVersion{
....
}
c.versions[ver.ApplicationVersionId] = version
}
....
}
return nil
}
HelmRepo 协调器
HelmRepo.Status.Data 加载流程
LoadRepoIndex: convert index.yaml to IndexFile
MergeRepoIndex: merge new and old IndexFile
savedIndex.Bytes(): compress data with zlib.NewWriter
将 savedIndex 数据存入 CRD(HelmRepo.Status.Data)
关键结构体
// HelmRepo.Status.Data == SavedIndex 压缩后的数据
type SavedIndex struct {
APIVersion string `json:"apiVersion"`
Generated time.Time `json:"generated"`
Applications map[string]*Application `json:"apps"`
PublicKeys []string `json:"publicKeys,omitempty"`
// Annotations are additional mappings uninterpreted by Helm. They are made available for
// other applications to add information to the index file.
Annotations map[string]string `json:"annotations,omitempty"`
}
// IndexFile represents the index file in a chart repository
type IndexFile struct {
APIVersion string `json:"apiVersion"`
Generated time.Time `json:"generated"`
Entries map[string]ChartVersions `json:"entries"`
PublicKeys []string `json:"publicKeys,omitempty"`
}
代码位置
func (r *ReconcileHelmRepo) syncRepo(instance *v1alpha1.HelmRepo) error {
// 1. load index from Helm repo
index, err := Helmrepoindex.LoadRepoIndex(context.TODO(), instance.Spec.Url, &instance.Spec.Credential)
if err != nil {
klog.Errorf("load index failed, repo: %s, url: %s, err: %s", instance.GetTrueName(), instance.Spec.Url, err)
return err
}
existsSavedIndex := &Helmrepoindex.SavedIndex{}
if len(instance.Status.Data) != 0 {
existsSavedIndex, err = Helmrepoindex.ByteArrayToSavedIndex([]byte(instance.Status.Data))
if err != nil {
klog.Errorf("json unmarshal failed, repo: %s, error: %s", instance.GetTrueName(), err)
return err
}
}
// 2. merge new index with old index which is stored in crd
savedIndex := Helmrepoindex.MergeRepoIndex(index, existsSavedIndex)
// 3. save index in crd
data, err := savedIndex.Bytes()
if err != nil {
klog.Errorf("json marshal failed, error: %s", err)
return err
}
instance.Status.Data = string(data)
return nil
}
Question:
Q1:Helm 仓库发包时如何进行 Helm release 版本控制
A:修改 Charts.yaml 中的字段 version,然后 Helm package, 等于新增一个 .tgz 包,老版本的不要删除,这时候执行 index 的时候会吧所有的 .tgz 包包含在内。
$ Helm repo index stable --url=xxx.xx.xx.xxx:8081/
$ cat index.yaml
....
redis:
- apiVersion: v1
appVersion: "1.0"
created: "2021-06-22T16:34:58.286583012+08:00"
description: A Helm chart for Kubernetes
digest: fd7c0d962155330527c0a512a74bea33302fca940b810c43ee5f461b1013dbf5
name: redis
urls:
- xxx.xx.xx.xxx:8081/redis-0.1.1.tgz
version: 0.1.1
- apiVersion: v1
appVersion: "1.0"
created: "2021-06-22T16:34:58.286109049+08:00"
description: A Helm chart for Kubernetes
digest: 1a23bd6d5e45f9d323500bbe170011fb23bfccf2c1bd25814827eb8dc643d7f0
name: redis
urls:
- xxx.xx.xx.xxx:8081/redis-0.1.0.tgz
version: 0.1.0
Q2:KuberSphere 版本同步功能有缺失?用户添加完 Helm 仓库后,如果有新的应用发布,查询不到
A:解决方案:使用 3 种同步策略
定时同步 Helm 仓库(HelmRepo 设置一个定时协调的事件)
企业仓库,用户可以设置 hook,发布新版本的时候主动触发更新
用户主动更新 charts
Q3:index.yaml 缓存位置
A:某些仓库的index.yaml 比较大,如果1000个用户,1000个charts 会太吃内存。建议常用index.yaml的放在内存中,不常用的index.yaml存储在本地磁盘。
本文由博客一文多发平台 OpenWrite 发布!
好文链接
发表评论