Kubernetes學習筆記之CSI Plugin註冊機制源碼解析

Overview

最近在維護組內K8s CSI plugin代碼時,一直對其內部原理好奇,故趁機深刻學習熟悉K8s CSI相關原理。
部署K8s持久化存儲插件時,須要按照CSI官網說明,部署一個daemonset pod實現插件註冊,該pod內容器包含 node-driver-registrar ,部署yaml相似以下:html

apiVersion: apps/v1
kind: DaemonSet
metadata:
  annotations:
    deprecated.daemonset.template.generation: "7"
  name: sunnyfs-csi-share-node
  namespace: sunnyfs
spec:
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: sunnyfs-csi-share-node
  template:
    metadata:
      labels:
        app: sunnyfs-csi-share-node
    spec:
      containers:
        - args:
            - --csi-address=/csi/sunnyfs-csi-share.sock
            - --kubelet-registration-path=/csi/sunnyfs-csi-share.sock
          env:
            - name: KUBE_NODE_NAME
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: spec.nodeName
          image: quay.io/k8scsi/csi-node-driver-registrar:v2.1.0
          imagePullPolicy: IfNotPresent
          name: node-driver-registrar
          resources:
            limits:
              cpu: "2"
              memory: 4000Mi
            requests:
              cpu: "1"
              memory: 4000Mi
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /registration
              name: registration-dir
            - mountPath: /csi
              name: socket-dir
        - args:
            - --v=5
            - --endpoint=unix:///csi/sunnyfs-csi-share/sunnyfs-csi-share.sock
            - --nodeid=$(NODE_ID)
            - --drivername=csi.sunnyfs.share.com
            - --version=v1.0.0
          env:
            - name: NODE_ID
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: spec.nodeName
          image: sunnyfs-csi-driver:v1.0.4
          imagePullPolicy: IfNotPresent
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - -c
                  - rm -rf /csi/sunnyfs-csi-share.sock /registration/csi.sunnyfs.share.com-reg.sock
          name: sunnyfs-csi-driver
          resources:
            limits:
              cpu: "2"
              memory: 4000Mi
            requests:
              cpu: "1"
              memory: 4000Mi
          securityContext:
            privileged: true
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
          volumeMounts:
            - mountPath: /registration
              name: registration-dir
            - mountPath: /csi
              name: socket-dir
            - mountPath: /var/lib/kubelet/pods
              mountPropagation: Bidirectional
              name: mountpoint-dir
      dnsPolicy: ClusterFirstWithHostNet
      hostNetwork: true
      imagePullSecrets:
        - name: regcred
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      tolerations:
        - operator: Exists
      volumes:
        - hostPath:
            path: /var/lib/kubelet/plugins/csi.sunnyfs.share.com
            type: DirectoryOrCreate
          name: socket-dir
        - hostPath:
            path: /var/lib/kubelet/plugins_registry
            type: Directory
          name: registration-dir
        - hostPath:
            path: /var/lib/kubelet/pods
            type: Directory
          name: mountpoint-dir
  updateStrategy:
    rollingUpdate:
      maxUnavailable: 1
    type: RollingUpdate

pod內部署了自定義的csi-plugin如sunnyfs-csi-driver,該csi-plugin後端實際存儲引擎是一個自研的文件類型存儲系統;和一個sidecar container node-driver-registrar ,該容器主要實現了自定義的csi-plugin的註冊。node

重要問題是,是如何作到csi-plugin註冊的?git

答案很簡單:daemonset中的 node-driver-registrar 做爲一個sidecar container,會被kubelet plugin-manager模塊調用,
node-driver-registrar sidecar container又會去調用咱們自研的csi-plugin即sunnyfs-csi-driver container。而kubelet在啓動時就會往plugin-manager模塊
中註冊一個csi plugin handler,該handler獲取sunnyfs-csi-driver container基本信息後,會作一些操做,如更新node的annotation以及建立/更新CSINode對象。github

源碼解析

node-driver-registrar 源碼解析

node-driver-registrar sidecar container代碼邏輯很簡單,主要作了兩件事:rpc調用自研的csi-plugin插件,調用了GetPluginInfo方法,獲取response.GetName即csiDriverName;
啓動一個grpc server,並監聽在宿主機上/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock,供csi plugin handler來調用。後端

大概看下代碼作的這兩件事。api

首先rpc調用自研的csi-plugin插件獲取csiDriverName,L137-L152 :緩存

func main() {
    // ...
    
    // 1. rpc調用自研的csi-plugin插件,調用了GetPluginInfo方法,獲取response.GetName即csiDriverName
    csiConn, err := connection.Connect(*csiAddress, cmm)
    csiDriverName, err := csirpc.GetDriverName(ctx, csiConn)
    
    // Run forever
    nodeRegister(csiDriverName, addr)
}

GetDriverName 代碼以下,主要rpc調用自研csi-plugin中identity server中的GetPluginInfo方法:架構

import (
    "github.com/container-storage-interface/spec/lib/go/csi"
)

// GetDriverName returns name of CSI driver.
func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
    client := csi.NewIdentityClient(conn)
    req := csi.GetPluginInfoRequest{}
    rsp, err := client.GetPluginInfo(ctx, &req)
    // ...
    name := rsp.GetName()
    //...
    return name, nil
}

node-driver-registrar會先調用咱們自研csi-plugin中identity server中的GetPluginInfo方法,而 CSI(Container Storage Interface) 設計文檔
詳細說明了,咱們的csi-plugin中主要須要實現三個service: identity service, controller service和node service。其中,node service須要實現GetPluginInfo方法,返回咱們自研plugin相關的基本信息,
好比我這裏的identity service GetPluginInfo實現邏輯,主要返回咱們自研csi plugin name:app

func (ids *DefaultIdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
    klog.Infof("Using default GetPluginInfo")

    if ids.Driver.name == "" {
        return nil, status.Error(codes.Unavailable, "Driver name not configured")
    }

    if ids.Driver.version == "" {
        return nil, status.Error(codes.Unavailable, "Driver is missing version")
    }

    return &csi.GetPluginInfoResponse{
        Name:          ids.Driver.name,
        VendorVersion: ids.Driver.version,
    }, nil
}

而後,node-driver-registrar sidecar container就會啓動一個grpc server,並監聽在宿主機上/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock 。
該rpc server遵循 kubelet plugin registration標準 ,*registrationServer service提供GetInfo和NotifyRegistrationStatus方法供客戶端調用,
其實也就是被kubelet plugin manager模塊調用,代碼邏輯以下:框架

// 啓動一個grpc server並監聽在socket /var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock
func nodeRegister(csiDriverName, httpEndpoint string) {
    registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
    socketPath := buildSocketPath(csiDriverName)
    // ...
    lis, err := net.Listen("unix", socketPath)
    
    grpcServer := grpc.NewServer()
    registerapi.RegisterRegistrationServer(grpcServer, registrar)
    grpcServer.Serve(lis)
    // ...
}

// socket path爲:/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock
func buildSocketPath(csiDriverName string) string {
    return fmt.Sprintf("%s/%s-reg.sock", *pluginRegistrationPath, csiDriverName)
}

func newRegistrationServer(driverName string, endpoint string, versions []string) registerapi.RegistrationServer {
    return &registrationServer{
        driverName: driverName,
        endpoint:   endpoint,
        version:    versions,
    }
}
// GetInfo is the RPC invoked by plugin watcher
func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
    return &registerapi.PluginInfo{
        Type:              registerapi.CSIPlugin,
        Name:              e.driverName,
        Endpoint:          e.endpoint,
        SupportedVersions: e.version,
    }, nil
}
func (e registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
    if !status.PluginRegistered {
        os.Exit(1)
    }
    
    return &registerapi.RegistrationStatusResponse{}, nil
}

總之,node-driver-registrar sidecar container 主要代碼邏輯很簡單,先調用咱們自研的csi-plugin獲取csiDriverName,而後在/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock 啓動一個grpc server,並按照kubelet plugin registration標準
提供了registrationServer供kubelet plugin manager實現rpc調用。

接下來關鍵就是kubelet plugin manager是如何rpc調用node-driver-registrar sidecar container的?

kubelet plugin manager 源碼解析

kubelet組件在啓動時,會實例化 pluginManager 對象,這裏的socket dir就是 /var/lib/kubelet/plugins_registry/ 目錄:

const (
    DefaultKubeletPluginsRegistrationDirName = "plugins_registry"
)

klet.pluginManager = pluginmanager.NewPluginManager(
        klet.getPluginsRegistrationDir(), /* sockDir */
        kubeDeps.Recorder,
    )

func (kl *Kubelet) getPluginsRegistrationDir() string {
    return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsRegistrationDirName)
}

同時還會註冊一個CSIPlugin type的csi.RegistrationHandler{}對象,並啓動pluginManager對象,代碼見 L1385-L1391

// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))

// Start the plugin manager
klog.V(4).Infof("starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)

pluginmanager package 模塊代碼儘管比較多,但實際上主要就實現了兩個邏輯。

plugin watcher

pluginmanager模塊plugin watcher對象來 recursively watch /var/lib/kubelet/plugins_registry socket dir,而該對象實際上使用 github.com/fsnotify/fsnotify 包來實現該功能。
若是該socket dir增長或刪除一個socket file,都會寫入desiredStateOfWorld緩存對象的 socketFileToInfo map[string]PluginInfo 中,看下主要的watch socket dir代碼,代碼見 L50-L98

func (w *Watcher) Start(stopCh <-chan struct{}) error {
    // ...
    fsWatcher, err := fsnotify.NewWatcher()
    w.fsWatcher = fsWatcher
    // 去watch socket dir
    if err := w.traversePluginDir(w.path); err != nil {
        klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
    }

    // 啓動一個goroutine去watch socket dir中,socket文件的增長和刪除
    go func(fsWatcher *fsnotify.Watcher) {
        defer close(w.stopped)
        for {
            select {
            case event := <-fsWatcher.Events:
                if event.Op&fsnotify.Create == fsnotify.Create {
                    err := w.handleCreateEvent(event)
                } else if event.Op&fsnotify.Remove == fsnotify.Remove {
                    w.handleDeleteEvent(event)
                }
                continue
            case err := <-fsWatcher.Errors: 
                // ...
                continue
            case <-stopCh:
                // ...
                return
            }
        }
    }(fsWatcher)

    return nil
}

當咱們daemonset部署node-driver-registrar sidecar container時,/var/lib/kubelet/plugins_registry socket dir中會多一個socket file ${csiDriverName}-reg.sock,
這時plugin watcher對象會把數據寫入desiredStateOfWorld緩存中,供第二個邏輯reconcile使用

reconciler

該reconciler就是一個定時任務,每 rc.loopSleepDuration 運行一次,L84-L90

func (rc *reconciler) Run(stopCh <-chan struct{}) {
    wait.Until(func() {
        rc.reconcile()
    },
        rc.loopSleepDuration,
        stopCh)
}

每一次調諧,會去diff下兩個緩存map對象:desiredStateOfWorld和actualStateOfWorld。desiredStateOfWorld是指望狀態,actualStateOfWorld是實際狀態。
若是一個plugin在actualStateOfWorld緩存中但不在desiredStateOfWorld中(表示plugin已經被刪除了),或者儘管在desiredStateOfWorld中可是plugin.Timestamp不匹配(表示plugin從新註冊更新了),
則須要從desiredStateOfWorld緩存中刪除並註銷插件DeRegisterPlugin;若是一個plugin在desiredStateOfWorld中但不在actualStateOfWorld緩存中,說明是新建的plugin,須要添加到desiredStateOfWorld緩存中並註冊插件RegisterPlugin。
看下調諧主要邏輯 L110-L164

func (rc *reconciler) reconcile() {

    // diff下actualStateOfWorld和desiredStateOfWorld,判斷是否須要從desiredStateOfWorld緩存中刪除並註銷插件DeRegisterPlugin
    for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
        if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
            unregisterPlugin = true
        } else {
            for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
                if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
                    unregisterPlugin = true
                    break
                }
            }
        }
        if unregisterPlugin {
            err := rc.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
        }
    }

    // diff下desiredStateOfWorld和actualStateOfWorld,查是否須要添加到desiredStateOfWorld緩存中並註冊插件RegisterPlugin
    for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
        if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
            err := rc.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
        }
    }
}

這裏主要查看一個新建的plugin的註冊邏輯,reconciler對象會rpc調用node-driver-registrar sidecar container中rpc server提供的的GetInfo。
而後根據返回字段的type,從最開始註冊的pluginHandlers中查找對應的handler,這裏就是上文說的CSIPlugin type的csi.RegistrationHandler{}對象,並調用該對象的
ValidatePlugin和RegisterPlugin來註冊插件,這裏的註冊插件其實就是設置node annotation和建立/更新CSINode對象。最後rpc調用NotifyRegistrationStatus告知註冊結果。

看下注冊插件相關代碼,L74-L134

// 與/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock創建grpc通訊
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
    // ...
    c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock())
    return registerapi.NewRegistrationClient(c), c, nil
}

func (og *operationGenerator) GenerateRegisterPluginFunc(/*...*/) func() error {
    registerPluginFunc := func() error {
        client, conn, err := dial(socketPath, dialTimeoutDuration)
        // 調用node-driver-registrar sidecar container中rpc server提供的的GetInfo
        infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
        // 這裏handler就是上文說的CSIPlugin type的csi.RegistrationHandler{}對象
        handler, ok := pluginHandlers[infoResp.Type]
        // 調用handler.ValidatePlugin
        if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
        }
        // 加入actualStateOfWorldUpdater緩存
        err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
            SocketPath: socketPath,
            Timestamp:  timestamp,
            Handler:    handler,
            Name:       infoResp.Name,
        })
        // 這是最關鍵邏輯,調用handler.RegisterPlugin註冊插件
        // 這裏的infoResp.Endpoint是咱們自研的csi-plugin監聽的socket path
        if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
            return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
        }
        // ...
    }
    return registerPluginFunc
}

總之,kubelet plugin manager模塊代碼邏輯比較清晰簡單,主要兩個邏輯:經過plugin watcher對象去watch socket dir即/var/lib/kubelet/plugins_registry,把plugin數據放入
desiredStateOfWorld緩存中;reconcile調諧desiredStateOfWorld和actualStateOfWorld緩存,調用node-driver-registrar獲取plugin info,根據該plugin info查找plugin handler,
而後調用plugin handler來註冊插件RegisterPlugin,plugin handler會根據傳入的csi-plugin監聽的socket path,直接和咱們自研的csi-plugin通訊(其實node-driver-registrar起到中介做用,傳遞
咱們自研csi-plugin grpc server監聽的socket path這個關鍵信息)。

接下來關鍵邏輯就是csi.RegistrationHandler{}對象是如何註冊插件的?

csi.RegistrationHandler 源碼解析

csi.RegistrationHandler{}對象註冊插件邏輯,主要就是更新node annotation和建立/更新CSINode對象,這裏能夠看下代碼邏輯 L112-L154

import (
    csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
)

func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
    // ...
    // 與咱們自研的csi-plugin創建grpc通訊,並調用csi-plugin中node service中的NodeGetInfo()得到相關數據,供更新node annotation和建立CSINode對象使用
    csi, err := newCsiDriverClient(csiDriverName(pluginName))
    driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
    // ...
    // 這裏是主要邏輯:更新node annotation和建立/更新CSINode對象
    err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
    // ...
    return nil
}

// 與咱們自研的csi-plugin創建grpc通訊,建立一個grpc client
func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
    // ...
    nodeV1ClientCreator := newV1NodeClient
    return &csiDriverClient{
        driverName:          driverName,
        addr:                csiAddr(existingDriver.endpoint),
        nodeV1ClientCreator: nodeV1ClientCreator,
    }, nil
}
// 這裏調用csipbv1.NewNodeClient(conn)建立一個grpc client
// CSI標準文檔能夠參見該倉庫的 https://github.com/container-storage-interface/spec/blob/master/spec.md#rpc-interface
func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
    var conn *grpc.ClientConn
    conn, err = newGrpcConn(addr)
    nodeClient = csipbv1.NewNodeClient(conn)
    return nodeClient, conn, nil
}
func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
    network := "unix"
    return grpc.Dial(string(addr), /*...*/)
}

以上代碼中,主要包含兩個邏輯:更新node annotation;建立更新CSINode對象。
更新node annotation邏輯很簡單,主要是往當前Node中增長一個annotation csi.volume.kubernetes.io/nodeid:{"$csiDriverName":"$driverNodeID"} ,$csiDriverName是以前rpc調用node-driver-registrar sidecar container得到的,
$driverNodeID是直接rpc調用咱們自定義csi-plugin的node service NodeGetInfo得到的,代碼可見 L237-L273

而後是往apiserver中建立/更新CSINode對象,建立CSINode對象邏輯可見 CreateCSINode ,更新CSINode對象邏輯可見 installDriverToCSINode
就能夠經過kubectl查看CSINode對象:

csinodes.png

總之,csi.RegistrationHandler{}對象註冊插件其實主要就是更新了node annotation和建立/更新該plugin相應的CSINode對象。

總結

本文主要學習了CSI Plugin註冊機制相關原理邏輯,涉及的主要組件包括:由node-driver-registrar sidecar container和咱們自研的csi-plugin組成的daemonset pod,以及
kubelet plugin manager模塊框架包,和csi plugin handler模塊。其中,kubelet plugin manager模塊框架包是一個橋樑,會rpc調用node-driver-registrar sidecar container獲取
咱們自研csi-plugin相關信息如監聽的rpc socket地址,而後調用csi plugin handler模塊並傳入csi-plugin rpc socket地址, 與csi-plugin直接rpc通訊,
實現更新node annotation和建立/更新CSINode對象等相關業務邏輯。

這樣,經過以上幾個組件模塊共同做用,咱們自研的一個csi-plugin就註冊進來了。
可是,咱們自研的csi-plugin提供了create/delete volume等核心功能,又是如何工做的呢?後續有空再更新。

參考文獻

一文讀懂 K8s 持久化存儲流程

從零開始入門 K8s | Kubernetes 存儲架構及插件使用

Kubernetes Container Storage Interface (CSI) Documentation

node-driver-registrar

相關文章
相關標籤/搜索