Kubernetes源碼分析之Node Controller

本節全部的代碼基於1.13.4版本。node

啓動過程

以前在分析controller-manager中說到,controller對於每一個controller的控制格式基本一致,都是以start***Controller的方式封裝成一個獨立的方法,NodeController也不例外。在1.13.4的版本中,Node的控制器分紅了兩種(很早以前的版本只有一種),分別是NodeIpamControllerNodeLifecycleController。其中,NodeIpamController主要處理Node的IPAM地址相關,NodeLifecycleController處理Node的整個生命週期,本文主要分析NodeLifecycleController。
api

如圖,NodeLifecycleController以 startNodeLifecycleController方法開始它的生命週期的管理流程。主要關注兩個方法: NewNodeLifecycleControllerRun。NewNodeLifecycleController負責建立資源對象,Run負責啓動,完成任務的執行。

NewNodeLifecycleController分析

NewNodeLifecycleController主要完成如下任務:
一、根據給定的配置構造Controller大結構體,完成部分參數的配置任務;
二、爲podInformernodeInformerleaseInformer以及daemonSetInformer配置相應的回調方法,包括AddFuncUpdateFunc以及DeleteFunc。這樣,當相應的Node發生變化時,關聯的controller可以及時監聽到,並調用相應的處理方法;
三、返回構造完的結構體。
在配置的時候有幾個須要注意的變量,後面會常常用到。 緩存

runTaintManager:表示啓動一個TaintManager去從Node上驅逐Pod;
useTaintBasedEvictions:經過給Node添加 TaintNodeNotReadyTaintNodeUnreachable污點的方式替換以前的直接驅逐Pod的方式,經過流控刪除Pod。主要爲了防止Pod在某一時間點忽然被大量驅逐;
taintNodeByCondition:經過Node的狀態給Node添加相應的污點。
另外,與以前的版本不一樣的是,添加了leaseInformer,它的主要做用是用來判斷Node的健康。

Run方法分析

Run方法主要包含如下方法,每一個方法都是以單獨的goroutine運行:
一、go nc.taintManager.Run(stopCh):TaintManager,主要完成Pod的驅逐任務;
二、doNoScheduleTaintingPassWorker:完成NoSchedule的污點更新任務;
三、doNoExecuteTaintingPassdoEvictionPass:完成NoExecute的污點更新任務;
四、monitorNodeHealth:檢查Node的狀態,而且處理Node的增刪改查等任務,同時也會處理Pod的驅逐工做。
代碼以下網絡

// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	klog.Infof("Starting node controller")
	defer klog.Infof("Shutting down node controller")

	if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
		return
	}

	if nc.runTaintManager {
		go nc.taintManager.Run(stopCh)
	}

	if nc.taintNodeByCondition {
		// Close node update queue to cleanup go routine.
		defer nc.nodeUpdateQueue.ShutDown()

		// Start workers to update NoSchedule taint for nodes.
		for i := 0; i < scheduler.UpdateWorkerSize; i++ {
			// Thanks to "workqueue", each worker just need to get item from queue, because
			// the item is flagged when got from queue: if new event come, the new item will
			// be re-queued until "Done", so no more than one worker handle the same item and
			// no event missed.
			go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
		}
	}

	if nc.useTaintBasedEvictions {
		// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
		// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
		go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
	} else {
		// Managing eviction of nodes:
		// When we delete pods off a node, if the node was not empty at the time we then
		// queue an eviction watcher. If we hit an error, retry deletion.
		go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
	}

	// Incorporate the results of node health signal pushed from kubelet to master.
	go wait.Until(func() {
		if err := nc.monitorNodeHealth(); err != nil {
			klog.Errorf("Error monitoring node health: %v", err)
		}
	}, nc.nodeMonitorPeriod, stopCh)

	<-stopCh
}
複製代碼

執行過程

NodeLifecycleController的執行過程主要就是各個goroutine對應的任務,一一分析。app

TaintManager

TaintManager經過Run方法開始啓動。在Run方法內,主要作了幾個工做:
一、初始化nodeUpdateChannelspodUpdateChannels,大小爲8個channel,後面能夠並行處理;
二、啓動兩個goroutine,分別監聽nodeUpdateQueue和podUpdateQueue的消息;
三、並行啓動8個工做任務,處理監聽到的nodeUpdate和podUpdate的消息。
async

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
	klog.V(0).Infof("Starting NoExecuteTaintManager")

	for i := 0; i < UpdateWorkerSize; i++ {
		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
	}

	// Functions that are responsible for taking work items out of the workqueues and putting them
	// into channels.
	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.nodeUpdateQueue.Get()
			if shutdown {
				break
			}
			nodeUpdate := item.(nodeUpdateItem)
			hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.nodeUpdateQueue.Done(item)
				return
			case tc.nodeUpdateChannels[hash] <- nodeUpdate:
				// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
			}
		}
	}(stopCh)

	go func(stopCh <-chan struct{}) {
		for {
			item, shutdown := tc.podUpdateQueue.Get()
			if shutdown {
				break
			}
			podUpdate := item.(podUpdateItem)
			hash := hash(podUpdate.nodeName, UpdateWorkerSize)
			select {
			case <-stopCh:
				tc.podUpdateQueue.Done(item)
				return
			case tc.podUpdateChannels[hash] <- podUpdate:
				// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
			}
		}
	}(stopCh)

	wg := sync.WaitGroup{}
	wg.Add(UpdateWorkerSize)
	for i := 0; i < UpdateWorkerSize; i++ {
		go tc.worker(i, wg.Done, stopCh)
	}
	wg.Wait()
}
複製代碼

在並行啓動的work任務中,優先處理nodeUpdate的事件,等到nodeUpdate處理完成以後,再去處理podUpdate。處理nodeUpdate的方法對應handleNodeUpdate,podUpdate對應handlePodUpdate
handleNodeUpdate主要的做用就是經過監聽到的nodeName獲取node信息,經過node信息獲取該node上對應的taints。而後對該node上全部的pod,依次執行processPodOnNode方法。方法以下:ide

func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
	node, err := tc.getNode(nodeUpdate.nodeName)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Delete
			klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
			tc.taintedNodesLock.Lock()
			defer tc.taintedNodesLock.Unlock()
			delete(tc.taintedNodes, nodeUpdate.nodeName)
			return
		}
		utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
		return
	}

	// Create or Update
	klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
	taints := getNoExecuteTaints(node.Spec.Taints)
	func() {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
		if len(taints) == 0 {
			delete(tc.taintedNodes, node.Name)
		} else {
			tc.taintedNodes[node.Name] = taints
		}
	}()
	pods, err := getPodsAssignedToNode(tc.client, node.Name)
	if err != nil {
		klog.Errorf(err.Error())
		return
	}
	if len(pods) == 0 {
		return
	}
	// Short circuit, to make this controller a bit faster.
	if len(taints) == 0 {
		klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
		for i := range pods {
			tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
		}
		return
	}

	now := time.Now()
	for i := range pods {
		pod := &pods[i]
		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
		tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
	}
}
複製代碼

handlePodUpdate經過獲取到單一的pod信息與node信息,也是最終執行processPodOnNode方法。方法以下:oop

func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
	pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
	if err != nil {
		if apierrors.IsNotFound(err) {
			// Delete
			podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
			klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
			tc.cancelWorkWithEvent(podNamespacedName)
			return
		}
		utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
		return
	}

	// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
	if pod.Spec.NodeName != podUpdate.nodeName {
		return
	}

	// Create or Update
	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
	klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
	nodeName := pod.Spec.NodeName
	if nodeName == "" {
		return
	}
	taints, ok := func() ([]v1.Taint, bool) {
		tc.taintedNodesLock.Lock()
		defer tc.taintedNodesLock.Unlock()
		taints, ok := tc.taintedNodes[nodeName]
		return taints, ok
	}()
	// It's possible that Node was deleted, or Taints were removed before, which triggered
	// eviction cancelling if it was needed.
	if !ok {
		return
	}
	tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
複製代碼

processPodOnNode方法主要將須要刪除的Pod按照預約好的格式添加到taintEvictionQueue,該queue內的任務都是設置好定時任務時間的,在相應的時間內調用deletePodHandler方法去刪除pod,該方法位於pkg/controller/nodelifecycle/scheduler/taint_manager.go下。方法以下:ui

func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
	return func(args *WorkArgs) error {
		ns := args.NamespacedName.Namespace
		name := args.NamespacedName.Name
		klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
		if emitEventFunc != nil {
			emitEventFunc(args.NamespacedName)
		}
		var err error
		for i := 0; i < retries; i++ {
			err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
			if err == nil {
				break
			}
			time.Sleep(10 * time.Millisecond)
		}
		return err
	}
}
複製代碼

因此,TaintManager的主要做用就是將須要驅逐的Pod配置好定時刪除的任務,而後從相應的Node上一一刪除。this

doNoScheduleTaintingPassWorker

當開啓taintNodeByCondition特性的時候,則會調用doNoScheduleTaintingPassWorker去對Node作NoSchedule的污點更新。調用的是doNoScheduleTaintingPass方法。方法以下:

func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
	node, err := nc.nodeLister.Get(nodeName)
	if err != nil {
		// If node not found, just ignore it.
		if apierrors.IsNotFound(err) {
			return nil
		}
		return err
	}

	// Map node's condition to Taints.
	var taints []v1.Taint
	for _, condition := range node.Status.Conditions {
		if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
			if taintKey, found := taintMap[condition.Status]; found {
				taints = append(taints, v1.Taint{
					Key:    taintKey,
					Effect: v1.TaintEffectNoSchedule,
				})
			}
		}
	}
	if node.Spec.Unschedulable {
		// If unschedulable, append related taint.
		taints = append(taints, v1.Taint{
			Key:    schedulerapi.TaintNodeUnschedulable,
			Effect: v1.TaintEffectNoSchedule,
		})
	}

	// Get exist taints of node.
	nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
		// only NoSchedule taints are candidates to be compared with "taints" later
		if t.Effect != v1.TaintEffectNoSchedule {
			return false
		}
		// Find unschedulable taint of node.
		if t.Key == schedulerapi.TaintNodeUnschedulable {
			return true
		}
		// Find node condition taints of node.
		_, found := taintKeyToNodeConditionMap[t.Key]
		return found
	})
	taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
	// If nothing to add not delete, return true directly.
	if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
		return nil
	}
	if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
		return fmt.Errorf("failed to swap taints of node %+v", node)
	}
	return nil
}
複製代碼

doNoScheduleTaintingPass主要作了如下工做:
一、根據nodeName獲取node信息;
二、根據node.Status.Conditions字段,判斷node是否須要添加NoSchedule污點,判斷的標準以下:

只要處於任一狀態,即須要添加NoSchedule污點;
三、若是Node爲Unschedulable狀態,一樣添加NoSchedule的污點;
四、根據Node上已有的污點,判斷以前添加的哪些污點是須要添加的,哪些是須要刪除的;
五、調用 SwapNodeControllerTaint對Node進行污點的狀態更新。

doNoExecuteTaintingPass與doEvictionPass

doNoExecuteTaintingPassdoEvictionPass二者只會執行其一。

當開啓useTaintBasedEvictions特性的時候,調用 doNoExecuteTaintingPass方法爲Node添加 NoExecute污點;而 doEvictionPass則是直接判斷哪些Pod須要驅逐,直接去作刪除工做。
doNoExecuteTaintingPass方法中,經過獲取 zoneNoExecuteTainter內的數據對Node狀態進行判斷,若是須要則添加上NoExecute污點,並調用 SwapNodeControllerTaint方法更新該Node上的污點。zoneNoExecuteTainter的信息是經過 monitorNodeHealth方法獲取到的,後面再分析。 doNoExecuteTaintingPass的方法以下:
doEvictionPass則是直接經過獲取 zonePodEvictor內的數據,判斷哪些Pod須要被驅除,則直接調用Pod的DELETE接口,完成Pod的驅逐任務。zonePodEvictor的信息也是經過 monitorNodeHealth方法獲取到的。 doEvictionPass方法以下:
兩種方法的不一樣在於, doNoExecuteTaintingPass只是對Node打上污點,而 doEvictionPass則是完成了最終的刪除工做。 doEvictionPass的這種方式會致使某一個時間段內,大量的Pod須要被刪除,會產生很大的流量;而 doNoExecuteTaintingPass經過給Node打上污點,讓TaintManager去作最終的Pod刪除工做,TaintManager的刪除任務是分時間段定時執行的,因此不會產生這種大流量的問題。所以建議開啓這個特性,在kube-controller-manager的啓動參數加上 --feature-gates=TaintBasedEvictions=true便可。

monitorNodeHealth

前面幾個goroutine的任務主要圍繞着Taint來展開,而monitorNodeHealth則是定時更新Node的信息,併產生數據的來源。
monitorNodeHealth的主要任務能夠分爲如下步驟:
一、獲取全部的Node信息,按照哪些是新增的、哪些是須要刪除的以及哪些是須要從新規劃的返回節點的相應信息;

二、對新增Node、刪除Node以及待規劃Node作相應的處理操做;
三、遍歷全部的Node,更新Node狀態,調用 tryUpdateNodeHealth方法;
四、根據獲取到的Node狀態,和原先的Node狀態做對比,對Node作相應的污點標記。此段代碼較長,基本結構以下
五、針對Node網絡中斷問題,根據不一樣的Node狀態配置相應的驅逐速率,調用 handleDisruption方法。
接下來,針對每一個步驟一一分析。

步驟1

首先經過List接口獲取全部的Node信息,經過classifyNodes完成Node的劃分。classifyNodes規則劃分很簡單,比對knownNodeSetallNodes,能夠理解爲knownNodeSet爲上一次的數據,allNodes爲新的數據,則:
一、若是在allNodes存在,在knownNodeSet不存在,爲新增的Node;
二、若是在knownNodeSet存在,在allNodes不存在,爲刪除的Node;
三、若是在knownNodeSetallNodes都存在,可是沒有zone states,爲newZoneRepresentatives的Node。每一個Node都要歸屬於一個Zone。

步驟2

在步驟1完成節點的劃分以後,步驟2針對每種類型的節點作相應的處理操做。
一、待新增的Node,將其加入到knownNodeSet內緩存,經過addPodEvictorForNewZone爲其歸屬一個Zone,經過useTaintBasedEvictions的開關控制,判斷是標記Node爲Reachable或是取消Pod的驅逐工做。總之就是表示這個Node能夠開始正常使用了;
二、待刪除的Node,將其從knownNodeSet內刪除;
三、未劃分Zone的Node,將其添加到Zone緩存中去。

步驟3

對獲取到的全部的Node,調用PollImmediate方法,每20ms,重試5次,去更新Node的狀態,主要調用了tryUpdateNodeHealth方法。tryUpdateNodeHealth方法值中,主要關注observedReadyConditioncurrentReadyCondition。能夠理解爲observedReadyCondition表示上一次的Node狀態,currentReadyCondition表示當前的Node狀態。如下的多重if-else都是根據這兩個值來操做的。

步驟4

整個大的語句從currentReadyCondition不爲空開始,分如下幾種狀況:
一、observedReadyCondition的值爲False,即Node未Ready,給Node打上node.kubernetes.io/not-ready:NoExecute的污點或是直接驅逐Node上的Pod;
二、observedReadyCondition的值爲Unknown,給Node打上node.kubernetes.io/unreachable:NoExecute的污點或是直接驅除Node上的Pod;
三、observedReadyCondition的值爲True,表示Node是正常工做的狀態,標記Node爲Reachable或是中止驅逐Pod的操做;
四、currentReadyCondition不爲True而observedReadyCondition爲True,表示Node處於Not Ready的狀態,標記Node爲Not Ready,並更新Node上的Pod狀態;
五、currentReadyCondition不爲True而且配置了cloudprovider,作刪除Node的操做。
整個大的循環主要的任務就是對Node的狀態進行判斷,作Node的污點標記或是驅逐相關操做。zoneNoExecuteTainterzonePodEvictor兩個數據集的信息都是在此作相應的更新的。

步驟5

最終調用handleDisruption作網絡中斷的一些相關處理操做。
中斷主要有如下幾種狀態:

handleDisruption中,經過 allAreFullyDisruptedallWasFullyDisrupted標記如今的zone狀態和以前緩存的zone狀態,分別表示最新的結果和上一次的結果信息。而後作三種處理操做:
一、若是新的狀態爲 fullDisruption,即全中斷,表示全部Node都處於Not Ready的狀態,此時恢復正常的驅逐速率,並中止作驅逐操做;
二、若是新的狀態爲 partialDisruption,即部分中斷,表示部分Node處於Not Ready的狀態,此時設置用戶定義的驅逐速率;
三、若是新的狀態爲 normal,恢復到默認的驅逐速率。
主要就是根據不一樣的中斷狀態控制驅逐速率,維持系統的穩定性。 至此,NodeLifecycleController的任務完成。主要流程就是針對Node的不一樣狀態更新Node信息,打上/刪除Node上相應的污點保證Node是否可被調度,並定時作Pod的驅逐操做。
相關文章
相關標籤/搜索