kubernetes數據卷管理源碼分析

概述

volume是k8s中很重要的一個環節,主要用來存儲k8s中pod生產的一些系統或者業務數據。k8s在kubelet中提供了volume管理的邏輯node

源碼分析

首先是kubelet啓動方法git

func main() {
       s := options.NewKubeletServer()
       s.AddFlags(pflag.CommandLine)

       flag.InitFlags()
       logs.InitLogs()
       defer logs.FlushLogs()

       verflag.PrintAndExitIfRequested()

       if err := app.Run(s, nil); err != nil {
              fmt.Fprintf(os.Stderr, "error: %v\n", err)
              os.Exit(1)
       }
}

很容易發現run方法中包含了kubelet全部重要信息docker

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
        
		//配置驗證
    	...

       if kubeDeps == nil {
              ...

              kubeDeps, err = UnsecuredKubeletDeps(s)

              ...
       }

       //初始化cAdvisor以及containerManager等管理器
       ...


       if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
              return err
       }

       ...
}

裏面有兩個與volume管理相關的重要方法express

  • UnsecuredKubeletDeps:會初始化docker client、網絡管理插件、數據管理插件等系統核心組件,由於不方便對外部開放,因此命名爲unsecure。其中咱們須要關注的是它對volume plugin的初始化操做apache

    func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {
    
    	    ...
    
    		return &kubelet.KubeletDeps{
    			Auth:               nil, 
    			CAdvisorInterface:  nil, 
    			Cloud:              nil, 
    			ContainerManager:   nil,
    			DockerClient:       dockerClient,
    			KubeClient:         nil,
    			ExternalKubeClient: nil,
    			Mounter:            mounter,
    			NetworkPlugins:     ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),
    			OOMAdjuster:        oom.NewOOMAdjuster(),
    			OSInterface:        kubecontainer.RealOS{},
    			Writer:             writer,
    			VolumePlugins:      ProbeVolumePlugins(s.VolumePluginDir),
    			TLSOptions:         tlsOptions,
    		}, nil
    	}

    在初始化volume plugin的時候會傳遞VolumePluginDir做爲自定義plugin的路徑,默認路徑爲**/usr/libexec/kubernetes/kubelet-plugins/volume/exec/**json

    func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin {
    		allPlugins := []volume.VolumePlugin{}
    		allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(volume.VolumeConfig{})...)
    		allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)
    		allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(pluginDir)...)
    		allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)
    		allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
    		return allPlugins
    	}

    能夠觀察到衆多插件中,有一個名爲flexvolume,只有這個插件帶有參數pluginDir,說明只有這個插件支持自定義實現。具體kubelet怎麼和這些插件交互,以及這些插件提供哪些接口,還須要繼續閱讀代碼api

  • RunKubelet:這纔是kubelet服務的啓動方法,其中最重要的功能都藏在startKubelet中bash

    func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
    
    		//初始化啓動器
    		...
    
    		if runOnce {
    			if _, err := k.RunOnce(podCfg.Updates()); err != nil {
    				return fmt.Errorf("runonce failed: %v", err)
    			}
    			glog.Infof("Started kubelet %s as runonce", version.Get().String())
    		} else {
    			startKubelet(k, podCfg, kubeCfg, kubeDeps)
    			glog.Infof("Started kubelet %s", version.Get().String())
    		}
    		return nil
    	}

    startKubelet包含兩個環節網絡

    • 不斷同步apiserver的pod信息,根據新增、刪除的pod對volume狀態進行同步更新
    • 啓動服務,監聽controller manager的請求。其中controller manager能夠輔助kubelet管理volume,用戶也能夠選擇禁用controller manager的管理
    func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {
    		// 同步pod信息
    		go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
    
    		// 啓動kubelet服務
    		if kubeCfg.EnableServer {
    			go wait.Until(func() {
    				k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
    			}, 0, wait.NeverStop)
    		}
    		if kubeCfg.ReadOnlyPort > 0 {
    			go wait.Until(func() {
    				k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
    			}, 0, wait.NeverStop)
    		}
    	}

    跟蹤同步pod信息的Run方法,會追查到這段代碼app

    func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    
    	    ...
    
    		go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
    
    		if kl.kubeClient != nil {
    			//同步node信息
    			go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    		}
    
    		// 同步pod信息
    		kl.pleg.Start()
    		kl.syncLoop(updates, kl)
    	}

    kl.volumeManager是kubelet進行數據卷管理的核心接口

    type VolumeManager interface {
    		Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
    
    		WaitForAttachAndMount(pod *v1.Pod) error
    
    		GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
    
    		GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
    
    		GetVolumesInUse() []v1.UniqueVolumeName
    
    		ReconcilerStatesHasBeenSynced() bool
    
    		VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
    
    		MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
    	}

    VolumeManager的Run會執行一個異步循環,當pod被調度到該node,它會檢查該pod所申請的全部volume,根據這些volume與pod的關係作attach/detach/mount/unmount操做

    func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    		defer runtime.HandleCrash()
    
    		go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    		glog.V(2).Infof("The desired_state_of_world populator starts")
    
    		glog.Infof("Starting Kubelet Volume Manager")
    		go vm.reconciler.Run(stopCh)
    
    		<-stopCh
    		glog.Infof("Shutting down Kubelet Volume Manager")
    	}

    其中重點關注的地方是vm.desiredStateOfWorldPopulator.Runvm.reconciler.Run這兩個方法。在介紹這兩個方法以前,須要補充一個關鍵信息,這也是理解這兩個方法的關鍵信息。

    kubelet管理volume的方式基於兩個不一樣的狀態:

    • DesiredStateOfWorld:預期中,pod對volume的使用狀況,簡稱預期狀態。當pod.yaml定製好volume,並提交成功,預期狀態就已經肯定
    • ActualStateOfWorld:實際中,pod對voluem的使用狀況,簡稱實際狀態。實際狀態是kubelet的後臺線程監控的結果

    理解了這兩個狀態,就能大概知道vm.desiredStateOfWorldPopulator.Run這個方法是幹什麼的呢。很明顯,它就是根據從apiserver同步到的pod信息,來更新DesiredStateOfWorld。另一個方法vm.reconciler.Run,是預期狀態和實際狀態的協調者,它負責將實際狀態調整成與預期狀態。預期狀態的更新實現,以及協調者具體如何協調,須要繼續閱讀代碼才能理解

    追蹤vm.desiredStateOfWorldPopulator.Run,咱們發現這段邏輯

    func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
    		for _, pod := range dswp.podManager.GetPods() {
    			if dswp.isPodTerminated(pod) {
    				continue
    			}
    			dswp.processPodVolumes(pod)
    		}
    	}

    kubelet會同步新增的pod到desiredStateOfWorldPopulator的podManager中。這段代碼就是輪詢其中非結束狀態的pod,並交給desiredStateOfWorldPopulator處理

    func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
    
    		...
    
    		for _, podVolume := range pod.Spec.Volumes {
    			volumeSpec, volumeGidValue, err :=
    				dswp.createVolumeSpec(podVolume, pod.Namespace)
    			if err != nil {
    				glog.Errorf(
    					"Error processing volume %q for pod %q: %v",
    					podVolume.Name,
    					format.Pod(pod),
    					err)
    				continue
    			}
    
    
    			_, err = dswp.desiredStateOfWorld.AddPodToVolume(
    				uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)
    			if err != nil {
    				glog.Errorf(
    					"Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v",
    					podVolume.Name,
    					volumeSpec.Name(),
    					uniquePodName,
    					err)
    			}
    
    			glog.V(10).Infof(
    				"Added volume %q (volSpec=%q) for pod %q to desired state.",
    				podVolume.Name,
    				volumeSpec.Name(),
    				uniquePodName)
    		}
    
    		dswp.markPodProcessed(uniquePodName)
    	}

    desiredStateOfWorldPopulator並不處理很重的邏輯,只是做爲一個代理,將控制某個pod預期狀態的邏輯交付給desiredStateOfWorld,並標記爲已處理

    func (dsw *desiredStateOfWorld) AddPodToVolume(
    		podName types.UniquePodName,
    		pod *v1.Pod,
    		volumeSpec *volume.Spec,
    		outerVolumeSpecName string,
    		volumeGidValue string) (v1.UniqueVolumeName, error) {
    
    		...
    
    		dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{
    			podName:             podName,
    			pod:                 pod,
    			spec:                volumeSpec,
    			outerVolumeSpecName: outerVolumeSpecName,
    		}
    
    		return volumeName, nil
    	}

    這段邏輯中,咱們忽略了前面一系列預處理操做,直接關注最核心的地方:肯定預期狀態的方式就是,用一個映射表結構,綁定volume到pod之間的關係,這個關係表就是綁定關係的參考依據

    看完了desiredStateOfWorldPopulator的處理邏輯,接着進入另外一個核心接口reconciler。它纔是volume manager中最重要的控制器

    追蹤reconciler的Run方法,咱們定位到最核心的一段代碼

    func (rc *reconciler) reconcile() {
    
    		//umount
    		for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
    			if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
    
    				...
    
    				err := rc.operationExecutor.UnmountVolume(
    					mountedVolume.MountedVolume, rc.actualStateOfWorld)
    
    				...
    			}
    		}
    
    		// attach/mount
    		for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
    			volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
    			volumeToMount.DevicePath = devicePath
    			if cache.IsVolumeNotAttachedError(err) {
    
    				...
    
    				err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
    
    				...
    
    			} else if !volMounted || cache.IsRemountRequiredError(err) {
    
    				...
    
    				err := rc.operationExecutor.MountVolume(
    					rc.waitForAttachTimeout,
    					volumeToMount.VolumeToMount,
    					rc.actualStateOfWorld)
    
    				...
    			}
    		}
    
    		//detach/unmount
    		for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
    			if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
    				!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
    				if attachedVolume.GloballyMounted {
    
    					...
    
    					err := rc.operationExecutor.UnmountDevice(
    						attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
    					...
    
    				} else {
    
    					...
    
    					err := rc.operationExecutor.DetachVolume(
    							attachedVolume.AttachedVolume, false,rc.actualStateOfWorld)
    
    					...
    				}
    			}
    		}
    	}

    我略去了多餘的代碼,保留最核心的部分。這段控制邏輯就是一個協調器,具體要作的事情就是,根據實際狀態與預期狀態的差別,作協調操做

    • volume和pod的預期狀態不存在綁定關係,則detach volume,並對pod和volume執行unmount操做
    • volume和pod的預期狀態存在綁定關係,則attach volume,並對pod和volume執行mount操做

    若是採用自定義的flexvolume插件,上述這些方法會對插件中實現的方法進行系統調用

    • AttachVolume:調用attach
    • DetachVolume:調用detach
    • MountVolume:調用mountdevice,mount
    • UnmountVolume:調用unmount
    • UnmountDevice:調用umountdevice

    flex volume提供的lvm插件。若是須要支持mount和unmount操做,能夠在這個腳本中補充

    #!/bin/bash
    
    	# Copyright 2015 The Kubernetes Authors.
    	#
    	# Licensed under the Apache License, Version 2.0 (the "License");
    	# you may not use this file except in compliance with the License.
    	# You may obtain a copy of the License at
    	#
    	#     http://www.apache.org/licenses/LICENSE-2.0
    	#
    	# Unless required by applicable law or agreed to in writing, software
    	# distributed under the License is distributed on an "AS IS" BASIS,
    	# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    	# See the License for the specific language governing permissions and
    	# limitations under the License.
    
    	# Notes:
    	#  - Please install "jq" package before using this driver.
    	usage() {
    		err "Invalid usage. Usage: "
    		err "\t$0 init"
    		err "\t$0 attach <json params> <nodename>"
    		err "\t$0 detach <mount device> <nodename>"
    		err "\t$0 waitforattach <mount device> <json params>"
    		err "\t$0 mountdevice <mount dir> <mount device> <json params>"
    		err "\t$0 unmountdevice <mount dir>"
    		err "\t$0 isattached <json params> <nodename>"
    		exit 1
    	}
    
    	err() {
    		echo -ne $* 1>&2
    	}
    
    	log() {
    		echo -ne $* >&1
    	}
    
    	ismounted() {
    		MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`
    		if [ "${MOUNT}" == "${MNTPATH}" ]; then
    			echo "1"
    		else
    			echo "0"
    		fi
    	}
    
    	getdevice() {
    		VOLUMEID=$(echo ${JSON_PARAMS} | jq -r '.volumeID')
    		VG=$(echo ${JSON_PARAMS}|jq -r '.volumegroup')
    
    		# LVM substitutes - with --
    		VOLUMEID=`echo $VOLUMEID|sed s/-/--/g`
    		VG=`echo $VG|sed s/-/--/g`
    
    		DMDEV="/dev/mapper/${VG}-${VOLUMEID}"
    		echo ${DMDEV}
    	}
    
    	attach() {
    		JSON_PARAMS=$1
    		SIZE=$(echo $1 | jq -r '.size')
    
    		DMDEV=$(getdevice)
    		if [ ! -b "${DMDEV}" ]; then
    			err "{\"status\": \"Failure\", \"message\": \"Volume ${VOLUMEID} does not exist\"}"
    			exit 1
    		fi
    		log "{\"status\": \"Success\", \"device\":\"${DMDEV}\"}"
    		exit 0
    	}
    
    	detach() {
    		log "{\"status\": \"Success\"}"
    		exit 0
    	}
    
    	waitforattach() {
    		shift
    		attach $*
    	}
    
    	domountdevice() {
    		MNTPATH=$1
    		DMDEV=$2
    		FSTYPE=$(echo $3|jq -r '.["kubernetes.io/fsType"]')
    
    		if [ ! -b "${DMDEV}" ]; then
    			err "{\"status\": \"Failure\", \"message\": \"${DMDEV} does not exist\"}"
    			exit 1
    		fi
    
    		if [ $(ismounted) -eq 1 ] ; then
    			log "{\"status\": \"Success\"}"
    			exit 0
    		fi
    
    		VOLFSTYPE=`blkid -o udev ${DMDEV} 2>/dev/null|grep "ID_FS_TYPE"|cut -d"=" -f2`
    		if [ "${VOLFSTYPE}" == "" ]; then
    			mkfs -t ${FSTYPE} ${DMDEV} >/dev/null 2>&1
    			if [ $? -ne 0 ]; then
    				err "{ \"status\": \"Failure\", \"message\": \"Failed to create fs ${FSTYPE} on device ${DMDEV}\"}"
    				exit 1
    			fi
    		fi
    
    		mkdir -p ${MNTPATH} &> /dev/null
    
    		mount ${DMDEV} ${MNTPATH} &> /dev/null
    		if [ $? -ne 0 ]; then
    			err "{ \"status\": \"Failure\", \"message\": \"Failed to mount device ${DMDEV} at ${MNTPATH}\"}"
    			exit 1
    		fi
    		log "{\"status\": \"Success\"}"
    		exit 0
    	}
    
    	unmountdevice() {
    		MNTPATH=$1
    		if [ ! -d ${MNTPATH} ]; then
    			log "{\"status\": \"Success\"}"
    			exit 0
    		fi
    
    		if [ $(ismounted) -eq 0 ] ; then
    			log "{\"status\": \"Success\"}"
    			exit 0
    		fi
    
    		umount ${MNTPATH} &> /dev/null
    		if [ $? -ne 0 ]; then
    			err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"
    			exit 1
    		fi
    
    		log "{\"status\": \"Success\"}"
    		exit 0
    	}
    
    	isattached() {
    		log "{\"status\": \"Success\", \"attached\":true}"
    		exit 0
    	}
    
    	op=$1
    
    	if [ "$op" = "init" ]; then
    		log "{\"status\": \"Success\"}"
    		exit 0
    	fi
    
    	if [ $# -lt 2 ]; then
    		usage
    	fi
    
    	shift
    
    	case "$op" in
    		attach)
    			attach $*
    			;;
    		detach)
    			detach $*
    			;;
    		waitforattach)
    			waitforattach $*
    			;;
    		mountdevice)
    			domountdevice $*
    			;;
    		unmountdevice)
    			unmountdevice $*
    			;;
    		isattached)
    	                isattached $*
    	                ;;
    		*)
    			log "{ \"status\": \"Not supported\" }"
    			exit 0
    	esac
    
    	exit 1

    值得注意的是,爲何會有兩次mount操做,一次mountdevice,一次mount。分別是作什麼的?

    其實k8s提供的volume管理方式是,一個volume能夠被多個pod掛載,若是某個device須要做爲多個pod的volume,就須要屢次掛載。可是device只能被掛載一次。因此,k8s採用的方式是,先用mountdevice將device掛載到一個全局目錄,而後這個全局目錄就能夠被屢次掛載到pod的卷目錄。如此一來,就能完成多pod掛載同一個volume

總結

只有理解了volume manager的代碼,在使用它提供的volume plugin或者實現自定義flex volume plugin時才能得心應手。以上代碼,都是基於k8s v1.6.6版本

相關文章
相關標籤/搜索