Kubernetes源碼分析之kubelet

本節全部的代碼基於1.13.4版本。node

啓動分析

Kubelet的啓動參數有兩種,kubeletFlagskubeletConfig。其中,kubeletFlags與咱們使用的kubelet的--參數命令保持一致;kubeletConfig經過解析特定的配置文件完成參數的配置,它們共同構成kubelet啓動參數的配置。如圖 docker

基本參數配置完成以後,接下來就是配置啓動的 Run方法。Kubelet啓動的Run方法代較長,以下

Run: func(cmd *cobra.Command, args []string) {
			// initial flag parse, since we disable cobra's flag parsing
			if err := cleanFlagSet.Parse(args); err != nil {
				cmd.Usage()
				klog.Fatal(err)
			}

			// check if there are non-flag arguments in the command line
			cmds := cleanFlagSet.Args()
			if len(cmds) > 0 {
				cmd.Usage()
				klog.Fatalf("unknown command: %s", cmds[0])
			}

			// short-circuit on help
			help, err := cleanFlagSet.GetBool("help")
			if err != nil {
				klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
			}
			if help {
				cmd.Help()
				return
			}

			// short-circuit on verflag
			verflag.PrintAndExitIfRequested()
			utilflag.PrintFlags(cleanFlagSet)

			// set feature gates from initial flags-based config
			if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
				klog.Fatal(err)
			}

			// validate the initial KubeletFlags
			if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
				klog.Fatal(err)
			}

			if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
				klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
			}

			// load kubelet config file, if provided
			if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
				kubeletConfig, err = loadConfigFile(configFile)
				if err != nil {
					klog.Fatal(err)
				}
				// We must enforce flag precedence by re-parsing the command line into the new object.
				// This is necessary to preserve backwards-compatibility across binary upgrades.
				// See issue #56171 for more details.
				if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
					klog.Fatal(err)
				}
				// update feature gates based on new config
				if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
					klog.Fatal(err)
				}
			}

			// We always validate the local configuration (command line + config file).
			// This is the default "last-known-good" config for dynamic config, and must always remain valid.
			if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
				klog.Fatal(err)
			}

			// use dynamic kubelet config, if enabled
			var kubeletConfigController *dynamickubeletconfig.Controller
			if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
				var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
				dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
					func(kc *kubeletconfiginternal.KubeletConfiguration) error {
						// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
						// so that we get a complete validation at the same point where we can decide to reject dynamic config.
						// This fixes the flag-precedence component of issue #63305.
						// See issue #56171 for general details on flag precedence.
						return kubeletConfigFlagPrecedence(kc, args)
					})
				if err != nil {
					klog.Fatal(err)
				}
				// If we should just use our existing, local config, the controller will return a nil config
				if dynamicKubeletConfig != nil {
					kubeletConfig = dynamicKubeletConfig
					// Note: flag precedence was already enforced in the controller, prior to validation,
					// by our above transform function. Now we simply update feature gates from the new config.
					if err := utilfeature.DefaultFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
						klog.Fatal(err)
					}
				}
			}

			// construct a KubeletServer from kubeletFlags and kubeletConfig
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

			// use kubeletServer to construct the default KubeletDeps
			kubeletDeps, err := UnsecuredDependencies(kubeletServer)
			if err != nil {
				klog.Fatal(err)
			}

			// add the kubelet config controller to kubeletDeps
			kubeletDeps.KubeletConfigController = kubeletConfigController

			// start the experimental docker shim, if enabled
			if kubeletServer.KubeletFlags.ExperimentalDockershim {
				if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
					klog.Fatal(err)
				}
				return
			}

			// run the kubelet
			klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
			if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
				klog.Fatal(err)
			}
複製代碼

主要包括如下步驟:
一、解析參數,對參數的合法性進行判斷;
二、根據kubeletConfig解析一些特殊的特性所須要配置的參數;
三、配置kubeletServer,包括KubeletFlagsKubeletConfiguration兩個參數;
四、構造kubeletDeps結構體;
五、啓動最終的Run方法。
除了最終的Run方法,其他的步驟仍是爲kubelet的啓動構建初始化的參數,無非就是換一個名稱,換一個不一樣的結構體,並配置相依賴的參數。
windows

啓動

啓動調用的是Run方法,如圖 api

initForOS經過對操做系統的判斷,若是是windows系統須要作一些預先的特殊處理; run方法即經過傳入的 kubeDeps參數開始執行啓動操做。
進入 run方法,開始主要執行對參數的再一次驗證,以及新的結構體的初始化。後續開始構建一些重要的客戶端,包括 eventClient主要處理事件的上報,與apiserver打交道; heartbeatClient主要處理心跳操做,與以後的PLEG相關; csiClient主要與CSI接口相關。配置完成以後,最終進入 RunKubelet方法。
RunKubelet方法最重要的方法有兩個: CreateAndInitKubeletstartKubelet,能夠理解爲 CreateAndInitKubelet爲參數的配置, startKubelet爲最終的啓動(說來講去仍是把參數封裝一遍,從新構造新的結構體)。
CreateAndInitKubelet方法經過調用 NewMainKubelet返回 Kubelet結構體。在 NewMainKubelet中,主要的配置有:
一、PodConfig。經過 makePodSourceConfig能夠發現kubelet獲取Pod的來源有如下途徑: 靜態Pod靜態Pod的URL地址以及 kube-apiserver
二、容器與鏡像的GC參數。
三、驅逐Pod策略。
最終經過參數填充 Kubelet結構體,完成kubelet結構體參數的最終配置。
接下來就是啓動了,不過在啓動以前會有一個判斷
判斷是之後臺daemon進程一直運行仍是隻啓動一次,即runOnce,基本上都是之後臺daemon啓動的方式,因此大部分調用的是 startKubelet方法。
startKubelet方法內部調用了最終的 Run方法,以下

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	if kl.logServer == nil {
		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
	}
	if kl.kubeClient == nil {
		klog.Warning("No api server defined - no node status update will be sent.")
	}

	// Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}

	if err := kl.initializeModules(); err != nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.Fatal(err)
	}

	// Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	if kl.kubeClient != nil {
		// Start syncing node status immediately, this may set up things the runtime needs to run.
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
		go kl.fastStatusUpdateOnce()

		// start syncing lease
		if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
			go kl.nodeLeaseController.Run(wait.NeverStop)
		}
	}
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	// Start loop to sync iptables util rules
	if kl.makeIPTablesUtilChains {
		go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
	}

	// Start a goroutine responsible for killing pods (that are not properly
	// handled by pod workers).
	go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

	// Start component sync loops.
	kl.statusManager.Start()
	kl.probeManager.Start()

	// Start syncing RuntimeClasses if enabled.
	if kl.runtimeClassManager != nil {
		go kl.runtimeClassManager.Run(wait.NeverStop)
	}

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
	kl.syncLoop(updates, kl)
}
複製代碼

能夠看到,在該方法內,完成的就是最終的kubelet的任務,經過多個goroutine完成。包括如下系列:
一、volumeManager,volume相關管理;
二、syncNodeStatus,定時同步Node狀態;
三、updateRuntimeUp,定時更新Runtime狀態;
四、syncNetworkUtil,定時同步網絡狀態;
五、podKiller,定時清理死亡的pod;
六、statusManager,pod狀態管理;
七、probeManager,pod探針管理;
八、啓動PLEG;
九、syncLoop,最重要的主進程,不停監聽外部數據的變化執行pod的相應操做。
至此,kubelet啓動過程完成。啓動主要完成的任務就是參數的配置和多個任務的啓動,經過構造一個循環進程不停監聽外部事件的變化,執行對應的pod處理工做,這也就是kubelet所須要負責的任務。緩存

Pod啓動流程

Pod的啓動在syncLoop方法下調用的syncLoopIteration方法開始。在syncLoopIteration方法內,有5個重要的參數 網絡

一、configCh:獲取Pod信息的channel,關於Pod相關的事件都從該channel獲取;
二、handler:處理Pod的handler;
三、syncCh:同步全部等待同步的Pod;
四、houseKeepingCh:清理Pod的channel;
五、plegCh:獲取PLEG信息,同步Pod。
每一個參數都是一個channel,經過select判斷某個channel獲取到信息,處理相應的操做。Pod的啓動顯然與configCh相關。
經過獲取configCh信息,獲取Pod整個生命週期中的多種狀態
相對應的,每一個狀態對應相應的處理方法
其中, ADD操做對應Pod的建立,其對應的處理方法爲 HandlePodAdditions
進入 HandlePodAdditions方法,主要如下幾個步驟:
一、根據Pod的建立時間對Pod進行排序;
二、podManager添加Pod;(對Pod的管理依賴於podManager)
三、處理mirrorPod,即靜態Pod的處理;
四、經過 dispatchWork方法分發任務,處理Pod的建立;
五、probeManager添加Pod。(readiness和liveness探針)
dispatchWork方法內,最核心的是調用了 kl.podWorkers.UpdatePod方法對Pod進行建立。 UpdatePod方法經過 podUpdates的map類型獲取相對應的Pod,map的key爲Pod的UID,value爲 UpdatePodOptions的結構體channel。經過獲取到須要建立的Pod以後,單獨起一個goroutine調用 managePodLoop方法完成Pod的建立, managePodLoop方法最終調用 syncPodFn完成Pod的建立, syncPodFn對應的就是Kubelet的 syncPod方法,位於 kubernetes/pkg/kubelet/kubelet.go下。通過層層環繞, syncPod就是最終處理Pod建立的方法。
syncPod主要的工做流如註釋
一、更新Pod的狀態,對應 generateAPIPodStatusstatusManager.SetPodStatus方法;
二、建立Pod存儲的目錄,對應 makePodDataDirs方法;
三、掛載對應的volume,對應 volumeManager.WaitForAttachAndMount方法;
四、獲取ImagePullSecrets,對應 getPullSecretsForPod方法;
五、建立容器,對應 containerRuntime.SyncPod方法,以下
至此,Pod的啓動到建立過程完成。經過 kubectl describe pod命令能夠查看Pod建立的整個生命週期。

PLEG分析

PLEG,即PodLifecycleEventGenerator,用來記錄Pod生命週期中對應的各類事件。在kubelet中,啓動主進程的syncLoop以前,先啓動pleg,如圖 ide

Start方法經過啓動一個定時的任務執行 relist方法
relist主要的工做就是經過比對Pod的原始狀態和如今的狀態,判斷Pod當前所處的生命週期,核心代碼以下
對每個Pod,比對Pod內的容器,經過 computeEvents-->generateEvents生成事件。在 generateEvents內,生成如下事件:
一、newState爲 plegContainerRunning,對應 ContainerStarted事件;
二、newState爲 plegContainerExited,對應 ContainerDied事件;
三、newState爲 plegContainerUnknown,對應 ContainerChanged事件;
四、newState爲 plegContainerNonExistent,查找oldState,若是對應 plegContainerExited,則生成的事件爲 ContainerRemoved,不然事件爲 ContainerDiedContainerRemoved

生成完事件以後,將事件一一通知到 eventChannel,該channel對應的就是 syncLoopIteration方法下的plegCh
syncLoopIteration方法下,接收到plegCh channel傳輸過來的消息以後,執行 HandlePodSyncs同步方法,最終調用到 dispatchWork這個Pod的處理方法,對Pod的生命進行管理。

GC管理

Kubelet會定時去清理多餘的container和image,完成ContainerGC和ImageGC。Kubelet在啓動的Run方法裏,會先去調用imageManager的Start方法,代碼位於kubernetes/pkg/kubelet/kubelet.go下,調用了initializeModules方法。imageManager.Start方法主要執行兩個步驟:
一、detectImages:主要用來監控images,判斷鏡像是可被發現的;
二、ListImages:主要用來獲取鏡像信息,寫入到緩存imageCache中。
在啓動的CreateAndInitKubelet方法中,開始執行鏡像與容器的回收 oop

StartGarbageCollection方法啓用兩個goroutine,一個用來作ContainerGC,一個用來作ImageGC,代碼以下

func (kl *Kubelet) StartGarbageCollection() {
	loggedContainerGCFailure := false
	go wait.Until(func() {
		if err := kl.containerGC.GarbageCollect(); err != nil {
			klog.Errorf("Container garbage collection failed: %v", err)
			kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
			loggedContainerGCFailure = true
		} else {
			var vLevel klog.Level = 4
			if loggedContainerGCFailure {
				vLevel = 1
				loggedContainerGCFailure = false
			}

			klog.V(vLevel).Infof("Container garbage collection succeeded")
		}
	}, ContainerGCPeriod, wait.NeverStop)

	// when the high threshold is set to 100, stub the image GC manager
	if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
		klog.V(2).Infof("ImageGCHighThresholdPercent is set 100, Disable image GC")
		return
	}

	prevImageGCFailed := false
	go wait.Until(func() {
		if err := kl.imageManager.GarbageCollect(); err != nil {
			if prevImageGCFailed {
				klog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
				// Only create an event for repeated failures
				kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
			} else {
				klog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
			}
			prevImageGCFailed = true
		} else {
			var vLevel klog.Level = 4
			if prevImageGCFailed {
				vLevel = 1
				prevImageGCFailed = false
			}

			klog.V(vLevel).Infof("Image garbage collection succeeded")
		}
	}, ImageGCPeriod, wait.NeverStop)
}
複製代碼

能夠看到容器的GC默認是每分鐘執行一次,鏡像的GC默認是每5分鐘執行一次,經過定時執行GC的清理完成容器與鏡像的回收。
容器的GC主要完成的任務包括刪除被驅除的容器、刪除sandboxes以及清理Pod的sandbox的日誌目錄,代碼位於kubernetes/pkg/kubelet/kuberuntime/kuberuntime_gc.go下,調用了GarbageCollect方法;鏡像的GC主要完成多餘鏡像的刪除和存儲空間的釋放,代碼位於kubernetes/pkg/kubelet/images/image_gc_manager.go下,調用了GarbageCollect方法。ui

相關文章
相關標籤/搜索