最近在維護組內K8s CSI plugin代碼時,一直對其內部原理好奇,故趁機深刻學習熟悉K8s CSI相關原理。
部署K8s持久化存儲插件時,須要按照CSI官網說明,部署一個daemonset pod實現插件註冊,該pod內容器包含 node-driver-registrar ,部署yaml相似以下:html
apiVersion: apps/v1 kind: DaemonSet metadata: annotations: deprecated.daemonset.template.generation: "7" name: sunnyfs-csi-share-node namespace: sunnyfs spec: revisionHistoryLimit: 10 selector: matchLabels: app: sunnyfs-csi-share-node template: metadata: labels: app: sunnyfs-csi-share-node spec: containers: - args: - --csi-address=/csi/sunnyfs-csi-share.sock - --kubelet-registration-path=/csi/sunnyfs-csi-share.sock env: - name: KUBE_NODE_NAME valueFrom: fieldRef: apiVersion: v1 fieldPath: spec.nodeName image: quay.io/k8scsi/csi-node-driver-registrar:v2.1.0 imagePullPolicy: IfNotPresent name: node-driver-registrar resources: limits: cpu: "2" memory: 4000Mi requests: cpu: "1" memory: 4000Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /registration name: registration-dir - mountPath: /csi name: socket-dir - args: - --v=5 - --endpoint=unix:///csi/sunnyfs-csi-share/sunnyfs-csi-share.sock - --nodeid=$(NODE_ID) - --drivername=csi.sunnyfs.share.com - --version=v1.0.0 env: - name: NODE_ID valueFrom: fieldRef: apiVersion: v1 fieldPath: spec.nodeName image: sunnyfs-csi-driver:v1.0.4 imagePullPolicy: IfNotPresent lifecycle: preStop: exec: command: - /bin/sh - -c - rm -rf /csi/sunnyfs-csi-share.sock /registration/csi.sunnyfs.share.com-reg.sock name: sunnyfs-csi-driver resources: limits: cpu: "2" memory: 4000Mi requests: cpu: "1" memory: 4000Mi securityContext: privileged: true terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /registration name: registration-dir - mountPath: /csi name: socket-dir - mountPath: /var/lib/kubelet/pods mountPropagation: Bidirectional name: mountpoint-dir dnsPolicy: ClusterFirstWithHostNet hostNetwork: true imagePullSecrets: - name: regcred restartPolicy: Always terminationGracePeriodSeconds: 30 tolerations: - operator: Exists volumes: - hostPath: path: /var/lib/kubelet/plugins/csi.sunnyfs.share.com type: DirectoryOrCreate name: socket-dir - hostPath: path: /var/lib/kubelet/plugins_registry type: Directory name: registration-dir - hostPath: path: /var/lib/kubelet/pods type: Directory name: mountpoint-dir updateStrategy: rollingUpdate: maxUnavailable: 1 type: RollingUpdate
pod內部署了自定義的csi-plugin如sunnyfs-csi-driver,該csi-plugin後端實際存儲引擎是一個自研的文件類型存儲系統;和一個sidecar container node-driver-registrar ,該容器主要實現了自定義的csi-plugin的註冊。node
重要問題是,是如何作到csi-plugin註冊的?git
答案很簡單:daemonset中的 node-driver-registrar 做爲一個sidecar container,會被kubelet plugin-manager模塊調用,
node-driver-registrar sidecar container又會去調用咱們自研的csi-plugin即sunnyfs-csi-driver container。而kubelet在啓動時就會往plugin-manager模塊
中註冊一個csi plugin handler,該handler獲取sunnyfs-csi-driver container基本信息後,會作一些操做,如更新node的annotation以及建立/更新CSINode對象。github
node-driver-registrar sidecar container代碼邏輯很簡單,主要作了兩件事:rpc調用自研的csi-plugin插件,調用了GetPluginInfo方法,獲取response.GetName即csiDriverName;
啓動一個grpc server,並監聽在宿主機上/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock,供csi plugin handler來調用。後端
大概看下代碼作的這兩件事。api
首先rpc調用自研的csi-plugin插件獲取csiDriverName,L137-L152 :緩存
func main() { // ... // 1. rpc調用自研的csi-plugin插件,調用了GetPluginInfo方法,獲取response.GetName即csiDriverName csiConn, err := connection.Connect(*csiAddress, cmm) csiDriverName, err := csirpc.GetDriverName(ctx, csiConn) // Run forever nodeRegister(csiDriverName, addr) }
GetDriverName 代碼以下,主要rpc調用自研csi-plugin中identity server中的GetPluginInfo方法:架構
import ( "github.com/container-storage-interface/spec/lib/go/csi" ) // GetDriverName returns name of CSI driver. func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) { client := csi.NewIdentityClient(conn) req := csi.GetPluginInfoRequest{} rsp, err := client.GetPluginInfo(ctx, &req) // ... name := rsp.GetName() //... return name, nil }
node-driver-registrar會先調用咱們自研csi-plugin中identity server中的GetPluginInfo方法,而 CSI(Container Storage Interface) 設計文檔
詳細說明了,咱們的csi-plugin中主要須要實現三個service: identity service, controller service和node service。其中,node service須要實現GetPluginInfo方法,返回咱們自研plugin相關的基本信息,
好比我這裏的identity service GetPluginInfo實現邏輯,主要返回咱們自研csi plugin name:app
func (ids *DefaultIdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { klog.Infof("Using default GetPluginInfo") if ids.Driver.name == "" { return nil, status.Error(codes.Unavailable, "Driver name not configured") } if ids.Driver.version == "" { return nil, status.Error(codes.Unavailable, "Driver is missing version") } return &csi.GetPluginInfoResponse{ Name: ids.Driver.name, VendorVersion: ids.Driver.version, }, nil }
而後,node-driver-registrar sidecar container就會啓動一個grpc server,並監聽在宿主機上/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock 。
該rpc server遵循 kubelet plugin registration標準 ,*registrationServer service提供GetInfo和NotifyRegistrationStatus方法供客戶端調用,
其實也就是被kubelet plugin manager模塊調用,代碼邏輯以下:框架
// 啓動一個grpc server並監聽在socket /var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock func nodeRegister(csiDriverName, httpEndpoint string) { registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions) socketPath := buildSocketPath(csiDriverName) // ... lis, err := net.Listen("unix", socketPath) grpcServer := grpc.NewServer() registerapi.RegisterRegistrationServer(grpcServer, registrar) grpcServer.Serve(lis) // ... } // socket path爲:/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock func buildSocketPath(csiDriverName string) string { return fmt.Sprintf("%s/%s-reg.sock", *pluginRegistrationPath, csiDriverName) } func newRegistrationServer(driverName string, endpoint string, versions []string) registerapi.RegistrationServer { return ®istrationServer{ driverName: driverName, endpoint: endpoint, version: versions, } } // GetInfo is the RPC invoked by plugin watcher func (e registrationServer) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) { return ®isterapi.PluginInfo{ Type: registerapi.CSIPlugin, Name: e.driverName, Endpoint: e.endpoint, SupportedVersions: e.version, }, nil } func (e registrationServer) NotifyRegistrationStatus(ctx context.Context, status *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) { if !status.PluginRegistered { os.Exit(1) } return ®isterapi.RegistrationStatusResponse{}, nil }
總之,node-driver-registrar sidecar container 主要代碼邏輯很簡單,先調用咱們自研的csi-plugin獲取csiDriverName,而後在/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock 啓動一個grpc server,並按照kubelet plugin registration標準
提供了registrationServer供kubelet plugin manager實現rpc調用。
接下來關鍵就是kubelet plugin manager是如何rpc調用node-driver-registrar sidecar container的?
kubelet組件在啓動時,會實例化 pluginManager 對象,這裏的socket dir就是 /var/lib/kubelet/plugins_registry/
目錄:
const ( DefaultKubeletPluginsRegistrationDirName = "plugins_registry" ) klet.pluginManager = pluginmanager.NewPluginManager( klet.getPluginsRegistrationDir(), /* sockDir */ kubeDeps.Recorder, ) func (kl *Kubelet) getPluginsRegistrationDir() string { return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsRegistrationDirName) }
同時還會註冊一個CSIPlugin type的csi.RegistrationHandler{}對象,並啓動pluginManager對象,代碼見 L1385-L1391 :
// Adding Registration Callback function for CSI Driver kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Start the plugin manager klog.V(4).Infof("starting plugin manager") go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
pluginmanager package 模塊代碼儘管比較多,但實際上主要就實現了兩個邏輯。
pluginmanager模塊plugin watcher對象來 recursively watch /var/lib/kubelet/plugins_registry socket dir,而該對象實際上使用 github.com/fsnotify/fsnotify
包來實現該功能。
若是該socket dir增長或刪除一個socket file,都會寫入desiredStateOfWorld緩存對象的 socketFileToInfo map[string]PluginInfo
中,看下主要的watch socket dir代碼,代碼見 L50-L98 :
func (w *Watcher) Start(stopCh <-chan struct{}) error { // ... fsWatcher, err := fsnotify.NewWatcher() w.fsWatcher = fsWatcher // 去watch socket dir if err := w.traversePluginDir(w.path); err != nil { klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err) } // 啓動一個goroutine去watch socket dir中,socket文件的增長和刪除 go func(fsWatcher *fsnotify.Watcher) { defer close(w.stopped) for { select { case event := <-fsWatcher.Events: if event.Op&fsnotify.Create == fsnotify.Create { err := w.handleCreateEvent(event) } else if event.Op&fsnotify.Remove == fsnotify.Remove { w.handleDeleteEvent(event) } continue case err := <-fsWatcher.Errors: // ... continue case <-stopCh: // ... return } } }(fsWatcher) return nil }
當咱們daemonset部署node-driver-registrar sidecar container時,/var/lib/kubelet/plugins_registry socket dir中會多一個socket file ${csiDriverName}-reg.sock,
這時plugin watcher對象會把數據寫入desiredStateOfWorld緩存中,供第二個邏輯reconcile使用
該reconciler就是一個定時任務,每 rc.loopSleepDuration
運行一次,L84-L90 :
func (rc *reconciler) Run(stopCh <-chan struct{}) { wait.Until(func() { rc.reconcile() }, rc.loopSleepDuration, stopCh) }
每一次調諧,會去diff下兩個緩存map對象:desiredStateOfWorld和actualStateOfWorld。desiredStateOfWorld是指望狀態,actualStateOfWorld是實際狀態。
若是一個plugin在actualStateOfWorld緩存中但不在desiredStateOfWorld中(表示plugin已經被刪除了),或者儘管在desiredStateOfWorld中可是plugin.Timestamp不匹配(表示plugin從新註冊更新了),
則須要從desiredStateOfWorld緩存中刪除並註銷插件DeRegisterPlugin;若是一個plugin在desiredStateOfWorld中但不在actualStateOfWorld緩存中,說明是新建的plugin,須要添加到desiredStateOfWorld緩存中並註冊插件RegisterPlugin。
看下調諧主要邏輯 L110-L164 :
func (rc *reconciler) reconcile() { // diff下actualStateOfWorld和desiredStateOfWorld,判斷是否須要從desiredStateOfWorld緩存中刪除並註銷插件DeRegisterPlugin for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() { if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) { unregisterPlugin = true } else { for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() { if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp { unregisterPlugin = true break } } } if unregisterPlugin { err := rc.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld) } } // diff下desiredStateOfWorld和actualStateOfWorld,查是否須要添加到desiredStateOfWorld緩存中並註冊插件RegisterPlugin for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() { if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) { err := rc.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld) } } }
這裏主要查看一個新建的plugin的註冊邏輯,reconciler對象會rpc調用node-driver-registrar sidecar container中rpc server提供的的GetInfo。
而後根據返回字段的type,從最開始註冊的pluginHandlers中查找對應的handler,這裏就是上文說的CSIPlugin type的csi.RegistrationHandler{}對象,並調用該對象的
ValidatePlugin和RegisterPlugin來註冊插件,這裏的註冊插件其實就是設置node annotation和建立/更新CSINode對象。最後rpc調用NotifyRegistrationStatus告知註冊結果。
看下注冊插件相關代碼,L74-L134 :
// 與/var/lib/kubelet/plugins_registry/${csiDriverName}-reg.sock創建grpc通訊 func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) { // ... c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock()) return registerapi.NewRegistrationClient(c), c, nil } func (og *operationGenerator) GenerateRegisterPluginFunc(/*...*/) func() error { registerPluginFunc := func() error { client, conn, err := dial(socketPath, dialTimeoutDuration) // 調用node-driver-registrar sidecar container中rpc server提供的的GetInfo infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{}) // 這裏handler就是上文說的CSIPlugin type的csi.RegistrationHandler{}對象 handler, ok := pluginHandlers[infoResp.Type] // 調用handler.ValidatePlugin if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { } // 加入actualStateOfWorldUpdater緩存 err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{ SocketPath: socketPath, Timestamp: timestamp, Handler: handler, Name: infoResp.Name, }) // 這是最關鍵邏輯,調用handler.RegisterPlugin註冊插件 // 這裏的infoResp.Endpoint是咱們自研的csi-plugin監聽的socket path if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil { return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err)) } // ... } return registerPluginFunc }
總之,kubelet plugin manager模塊代碼邏輯比較清晰簡單,主要兩個邏輯:經過plugin watcher對象去watch socket dir即/var/lib/kubelet/plugins_registry,把plugin數據放入
desiredStateOfWorld緩存中;reconcile調諧desiredStateOfWorld和actualStateOfWorld緩存,調用node-driver-registrar獲取plugin info,根據該plugin info查找plugin handler,
而後調用plugin handler來註冊插件RegisterPlugin,plugin handler會根據傳入的csi-plugin監聽的socket path,直接和咱們自研的csi-plugin通訊(其實node-driver-registrar起到中介做用,傳遞
咱們自研csi-plugin grpc server監聽的socket path這個關鍵信息)。
接下來關鍵邏輯就是csi.RegistrationHandler{}對象是如何註冊插件的?
csi.RegistrationHandler{}對象註冊插件邏輯,主要就是更新node annotation和建立/更新CSINode對象,這裏能夠看下代碼邏輯 L112-L154 :
import ( csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" ) func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error { // ... // 與咱們自研的csi-plugin創建grpc通訊,並調用csi-plugin中node service中的NodeGetInfo()得到相關數據,供更新node annotation和建立CSINode對象使用 csi, err := newCsiDriverClient(csiDriverName(pluginName)) driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx) // ... // 這裏是主要邏輯:更新node annotation和建立/更新CSINode對象 err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology) // ... return nil } // 與咱們自研的csi-plugin創建grpc通訊,建立一個grpc client func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) { // ... nodeV1ClientCreator := newV1NodeClient return &csiDriverClient{ driverName: driverName, addr: csiAddr(existingDriver.endpoint), nodeV1ClientCreator: nodeV1ClientCreator, }, nil } // 這裏調用csipbv1.NewNodeClient(conn)建立一個grpc client // CSI標準文檔能夠參見該倉庫的 https://github.com/container-storage-interface/spec/blob/master/spec.md#rpc-interface func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) { var conn *grpc.ClientConn conn, err = newGrpcConn(addr) nodeClient = csipbv1.NewNodeClient(conn) return nodeClient, conn, nil } func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) { network := "unix" return grpc.Dial(string(addr), /*...*/) }
以上代碼中,主要包含兩個邏輯:更新node annotation;建立更新CSINode對象。
更新node annotation邏輯很簡單,主要是往當前Node中增長一個annotation csi.volume.kubernetes.io/nodeid:{"$csiDriverName":"$driverNodeID"}
,$csiDriverName是以前rpc調用node-driver-registrar sidecar container得到的,
$driverNodeID是直接rpc調用咱們自定義csi-plugin的node service NodeGetInfo得到的,代碼可見 L237-L273 。
而後是往apiserver中建立/更新CSINode對象,建立CSINode對象邏輯可見 CreateCSINode ,更新CSINode對象邏輯可見 installDriverToCSINode ,
就能夠經過kubectl查看CSINode對象:
總之,csi.RegistrationHandler{}對象註冊插件其實主要就是更新了node annotation和建立/更新該plugin相應的CSINode對象。
本文主要學習了CSI Plugin註冊機制相關原理邏輯,涉及的主要組件包括:由node-driver-registrar sidecar container和咱們自研的csi-plugin組成的daemonset pod,以及
kubelet plugin manager模塊框架包,和csi plugin handler模塊。其中,kubelet plugin manager模塊框架包是一個橋樑,會rpc調用node-driver-registrar sidecar container獲取
咱們自研csi-plugin相關信息如監聽的rpc socket地址,而後調用csi plugin handler模塊並傳入csi-plugin rpc socket地址, 與csi-plugin直接rpc通訊,
實現更新node annotation和建立/更新CSINode對象等相關業務邏輯。
這樣,經過以上幾個組件模塊共同做用,咱們自研的一個csi-plugin就註冊進來了。
可是,咱們自研的csi-plugin提供了create/delete volume等核心功能,又是如何工做的呢?後續有空再更新。
從零開始入門 K8s | Kubernetes 存儲架構及插件使用