Kubernetes學習筆記之CSI External Provisioner源碼解析

Overview

最近在部署K8s持久化存儲插件時,須要按照CSI官網說明部署一個Deployment pod,因爲咱們的自研存儲類型是文件存儲不是塊存儲,因此部署pod不須要包含容器 external-attacher
只須要包含 external-provisioner sidecar container和咱們自研的csi-plugin容器就行,部署yaml相似以下:html

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "2"
  name: sunnyfs-csi-controller-share
  namespace: sunnyfs
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: sunnyfs-csi-controller-share
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: sunnyfs-csi-controller-share
    spec:
      containers:
        - args:
            - --csi-address=/csi/sunnyfs-provisioner-share.sock
            - --timeout=150s
          image: quay.io/k8scsi/csi-provisioner:v2.0.2
          imagePullPolicy: IfNotPresent
          name: csi-provisioner
          resources:
            limits:
              cpu: "4"
              memory: 8000Mi
            requests:
              cpu: "2"
              memory: 8000Mi
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /csi
              name: socket-dir
        - args:
            - --v=5
            - --endpoint=unix:///csi/sunnyfs-provisioner-share.sock
            - --nodeid=$(NODE_ID)
            - --drivername=csi.sunnyfs.share.com
            - --version=v1.0.0
          env:
            - name: NODE_ID
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: spec.nodeName
          image: sunnyfs-csi-driver:v1.0.3
          imagePullPolicy: IfNotPresent
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - -c
                  - rm -rf /csi/sunnyfs-provisioner-share.sock
          name: sunnyfs-csi-plugin
          resources:
            limits:
              cpu: "2"
              memory: 4000Mi
            requests:
              cpu: "1"
              memory: 4000Mi
          securityContext:
            capabilities:
              add:
                - SYS_ADMIN
            privileged: true
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /csi
              name: socket-dir
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: sunnyfs-csi-controller-account
      serviceAccountName: sunnyfs-csi-controller-account
      terminationGracePeriodSeconds: 30
      volumes:
        - hostPath:
            path: /var/lib/kubelet/plugins/csi.sunnyfs.share.com
            type: DirectoryOrCreate
          name: socket-dir

當咱們新建一個帶有storage class的pvc時,會動態建立pv對象,並在咱們自研的存儲引擎服務建立對應的volume。這也是利用了 storage class 來動態建立pv和存儲服務對應的volume。node

重要問題是,這是如何作到的呢?git

答案很簡單:external-provisioner sidecar container是一個controller去watch pvc/pv對象,當新建一個由storageclass建立pv的pvc(或刪除pv對象),該sidecar container會grpc調用
咱們自研的csi-plugin CreateVolume(DeleteVolume)方法來實際建立一個外部存儲volume,並新建一個pv對象寫入k8s api server。github

external-provisioner源碼解析

external-provisioner sidecar container主要邏輯很簡單:
先實例化 csiProvisioner對象 ,而後使用
csiProvisioner實例化 provisionController 對象,最後啓動
provisionController.Run 去watch pvc/pv對象實現主要業務邏輯,
即根據新建的pvc去調用csi-plugin CreateVolume建立volume,和新建一個pv對象寫入k8s api server。後端

provisionController在實例化時,會watch pvc/pv對象,代碼在 L695-L739api

// 實例化provisionController
func NewProvisionController(
    client kubernetes.Interface,
    provisionerName string,
    provisioner Provisioner,
    kubeVersion string,
    options ...func(*ProvisionController) error,
) *ProvisionController {
    // ...
    controller := &ProvisionController{
    client:                    client,
    provisionerName:           provisionerName,
    provisioner:               provisioner, // 在sync pvc時會調用provisioner來建立volume
    // ...
    }
    
    controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
    controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")
    informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)
    // ----------------------
    // PersistentVolumeClaims
    claimHandler := cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) { controller.enqueueClaim(obj) },
        UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
        DeleteFunc: func(obj interface{}) {
            // NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
            // or it's not in claimsInProgress and then we don't care
        },
    }
    // ...
    // -----------------
    // PersistentVolumes
    volumeHandler := cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) { controller.enqueueVolume(obj) },
        UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
        DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
    }

    // --------------
    // StorageClasses
    // no resource event handler needed for StorageClasses
    if controller.classInformer == nil {
        if controller.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
            controller.classInformer = informer.Storage().V1().StorageClasses().Informer()
        } else {
            controller.classInformer = informer.Storage().V1beta1().StorageClasses().Informer()
        }
    }
    controller.classes = controller.classInformer.GetStore()
    
    if controller.createProvisionerPVLimiter != nil {
        // 會調用volumeStore來新建pv對象寫入api server中
        controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)
    } else {
        // ...
    }

    return controller
}

這裏主要看下新建一個pvc時,是如何調諧的,看代碼 L933-L986緩存

func (ctrl *ProvisionController) processNextVolumeWorkItem(ctx context.Context) bool {
    // ...
    err := func() error {
        // ...
        if err := ctrl.syncVolumeHandler(ctx, key); err != nil {
            // ...
        }
        ctrl.volumeQueue.Forget(obj)
        return nil
    }()
    // ...
    return true
}
func (ctrl *ProvisionController) syncClaimHandler(ctx context.Context, key string) error {
    // ...
    return ctrl.syncClaim(ctx, claimObj)
}
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {
    // ...
    // 起始時,在pv controller調諧pvc去更新pvc annotation後,該shouldProvision纔會返回true
    should, err := ctrl.shouldProvision(ctx, claim)
    if err != nil {
        // ...
        return err
    } else if should {
        // 調用provisioner來建立後端存儲服務的volume,調用volumeStore對象建立pv對象並寫入k8s api server
        status, err := ctrl.provisionClaimOperation(ctx, claim)
        // ...
        return err
    }
    return nil
}

const (
    annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
)
func (ctrl *ProvisionController) shouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) (bool, error) {
    // ...
    // 這裏主要查看pvc是否存在"volume.beta.kubernetes.io/storage-provisioner" annotation,起初建立pvc時是沒有該annotation的
    // 該annotation會由kube-controller-manager組件中pv controller去添加,該pv controller也會去watch pvc對象,當發現該pvc定義的storage class
    // 的provisioner定義的plugin不是k8s in-tree plugin,會給該pvc打上"volume.beta.kubernetes.io/storage-provisioner" annotation
    // 能夠參考方法 https://github.com/kubernetes/kubernetes/blob/release-1.19/pkg/controller/volume/persistentvolume/pv_controller_base.go#L544-L566
    // 因此起始時,在pv controller調諧pvc去更新pvc annotation後,該shouldProvision纔會返回true
    if provisioner, found := claim.Annotations[annStorageProvisioner]; found {
        if ctrl.knownProvisioner(provisioner) {
            claimClass := GetPersistentVolumeClaimClass(claim)
            class, err := ctrl.getStorageClass(claimClass)
            // ...
            if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
                if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {
                    return true, nil
                }
                return false, nil
            }
            return true, nil
        }
    }
    
    return false, nil
}

因此,以上代碼關鍵邏輯是provisionClaimOperation函數,該函數主要實現兩個業務邏輯:調用provisioner來建立後端存儲服務的volume;調用volumeStore對象建立pv對象並寫入k8s api server。
查看下 provisionClaimOperation代碼架構

func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
    // ...
    // 準備相關參數
    claimClass := util.GetPersistentVolumeClaimClass(claim)
    pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
    claimRef, err := ref.GetReference(scheme.Scheme, claim)
    class, err := ctrl.getStorageClass(claimClass)
    options := ProvisionOptions{
        StorageClass: class,
        PVName:       pvName,
        PVC:          claim,
        SelectedNode: selectedNode,
    }

    // (1) 調用provisioner來建立後端存儲服務的volume
    volume, result, err := ctrl.provisioner.Provision(ctx, options)

    volume.Spec.ClaimRef = claimRef
    // 添加"pv.kubernetes.io/provisioned-by" annotation
    metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, ctrl.provisionerName)
    // (2) 調用volumeStore對象建立pv對象並寫入k8s api server
    if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil {
        return ProvisioningFinished, err
    }
    // 更新本地緩存
    if err = ctrl.volumes.Add(volume); err != nil {
        utilruntime.HandleError(err)
    }
    return ProvisioningFinished, nil
}

以上代碼主要邏輯比較簡單,關鍵邏輯是調用了 provisioner.Provision() 方法建立後端存儲服務的volume,看下關鍵邏輯代碼 Provision()app

func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
    pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
    req := csi.CreateVolumeRequest{
        Name:               pvName,
        Parameters:         options.StorageClass.Parameters,
        VolumeCapabilities: volumeCaps,
        CapacityRange: &csi.CapacityRange{
            RequiredBytes: int64(volSizeBytes),
        },
    }
    // 獲取 provision secret credentials
    provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.StorageClass.Parameters, pvName, &v1.PersistentVolumeClaim{
        ObjectMeta: metav1.ObjectMeta{
            Name:      options.PVC.Name,
            Namespace: options.PVC.Namespace,
        },
    })
    provisionerCredentials, err := getCredentials(ctx, p.client, provisionerSecretRef)
    req.Secrets = provisionerCredentials
    // ...

    // 關鍵邏輯:經過grpc調用咱們自研csi-plugin中的controller-service CreateVolume方法,在後端存儲服務中建立一個真實的volume
    // 該csiClient爲controller-service client,controller-service rpc標準能夠參考官方文檔 https://github.com/container-storage-interface/spec/blob/master/spec.md#controller-service-rpc
    rep, err = p.csiClient.CreateVolume(createCtx, &req)
    // ...
    pv := &v1.PersistentVolume{
        ObjectMeta: metav1.ObjectMeta{
            Name: pvName,
        },
        Spec: v1.PersistentVolumeSpec{
            AccessModes:  options.PVC.Spec.AccessModes,
            MountOptions: options.StorageClass.MountOptions,
            Capacity: v1.ResourceList{
                v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
            },
            // TODO wait for CSI VolumeSource API
            PersistentVolumeSource: v1.PersistentVolumeSource{
                CSI: &v1.CSIPersistentVolumeSource{
                    Driver:                     p.driverName,
                    VolumeHandle:               p.volumeIdToHandle(rep.Volume.VolumeId),
                    VolumeAttributes:           volumeAttributes,
                    ControllerPublishSecretRef: controllerPublishSecretRef,
                    NodeStageSecretRef:         nodeStageSecretRef,
                    NodePublishSecretRef:       nodePublishSecretRef,
                    ControllerExpandSecretRef:  controllerExpandSecretRef,
                },
            },
        },
    }

    return pv, controller.ProvisioningFinished, nil
}

以上代碼也比較清晰簡單,關鍵邏輯是經過grpc調用咱們自研csi-plugin的controller-service CreateVolume方法來建立外部存儲服務中的一個真實volume。socket

同理,external-provisioner sidecar container也會去watch pv,若是刪除pv時,會首先判斷是否同時須要刪除後端存儲服務的真實volume,若是須要
刪除則調用provisioner.Delete(),即自研csi-plugin的controller-service DeleteVolume方法去刪除volume。刪除volume能夠參考代碼 deleteVolumeOperation

至此,就能夠解釋當咱們建立一個帶有storage class的pvc時,external-provisioner sidecar container會watch pvc,並調用provisioner.Provision去
建立volume,而provisioner.CreateVolume又會去調用自研csi-plugin controller-service的CreateVolume()去真實建立一個volume,最後再根據該volume
獲取相關pv對象參數,並新建一個pv對象寫入k8s api server中。以上過程都是動態建立,自動化的,無需人工操做,這也是storage class的功能。

總結

本文主要學習了external-provisioner sidecar container相關原理邏輯,解釋了建立一個帶有storage class的pvc時,如何新建一個k8s pv對象,以及
如何建立一個後端存儲服務的真實volume。

至此,已經有了一個pvc對象,且該pvc對象已經bound了一個帶有後端存儲服務真實volume的pv,如今就能夠在pod內使用這個pvc了,pod containers內的mount path能夠像使用本地
目錄同樣使用這個volume path。可是,該volume path是如何被mount到pod containers中的呢?後續有空再更新。

參考文獻

一文讀懂 K8s 持久化存儲流程

從零開始入門 K8s | Kubernetes 存儲架構及插件使用

Kubernetes Container Storage Interface (CSI) Documentation

node-driver-registrar

相關文章
相關標籤/搜索