本節全部的代碼基於1.13.4版本。node
以前在分析controller-manager中說到,controller對於每一個controller的控制格式基本一致,都是以start***Controller
的方式封裝成一個獨立的方法,NodeController也不例外。在1.13.4的版本中,Node的控制器分紅了兩種(很早以前的版本只有一種),分別是NodeIpamController與NodeLifecycleController。其中,NodeIpamController主要處理Node的IPAM地址相關,NodeLifecycleController處理Node的整個生命週期,本文主要分析NodeLifecycleController。
api
startNodeLifecycleController
方法開始它的生命週期的管理流程。主要關注兩個方法:
NewNodeLifecycleController與
Run。NewNodeLifecycleController負責建立資源對象,Run負責啓動,完成任務的執行。
NewNodeLifecycleController主要完成如下任務:
一、根據給定的配置構造Controller大結構體,完成部分參數的配置任務;
二、爲podInformer
、nodeInformer
、leaseInformer
以及daemonSetInformer
配置相應的回調方法,包括AddFunc
、UpdateFunc
以及DeleteFunc
。這樣,當相應的Node發生變化時,關聯的controller可以及時監聽到,並調用相應的處理方法;
三、返回構造完的結構體。
在配置的時候有幾個須要注意的變量,後面會常常用到。 緩存
TaintNodeNotReady
和
TaintNodeUnreachable
污點的方式替換以前的直接驅逐Pod的方式,經過流控刪除Pod。主要爲了防止Pod在某一時間點忽然被大量驅逐;
Run方法主要包含如下方法,每一個方法都是以單獨的goroutine運行:
一、go nc.taintManager.Run(stopCh)
:TaintManager,主要完成Pod的驅逐任務;
二、doNoScheduleTaintingPassWorker
:完成NoSchedule的污點更新任務;
三、doNoExecuteTaintingPass
、doEvictionPass
:完成NoExecute的污點更新任務;
四、monitorNodeHealth
:檢查Node的狀態,而且處理Node的增刪改查等任務,同時也會處理Pod的驅逐工做。
代碼以下網絡
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
if nc.taintNodeByCondition {
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
}
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// Incorporate the results of node health signal pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
複製代碼
NodeLifecycleController的執行過程主要就是各個goroutine對應的任務,一一分析。app
TaintManager經過Run方法開始啓動。在Run方法內,主要作了幾個工做:
一、初始化nodeUpdateChannels
和podUpdateChannels
,大小爲8個channel,後面能夠並行處理;
二、啓動兩個goroutine,分別監聽nodeUpdateQueue和podUpdateQueue的消息;
三、並行啓動8個工做任務,處理監聽到的nodeUpdate和podUpdate的消息。
async
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
klog.V(0).Infof("Starting NoExecuteTaintManager")
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(stopCh)
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh)
}
wg.Wait()
}
複製代碼
在並行啓動的work任務中,優先處理nodeUpdate的事件,等到nodeUpdate處理完成以後,再去處理podUpdate。處理nodeUpdate的方法對應handleNodeUpdate
,podUpdate對應handlePodUpdate
。
handleNodeUpdate
主要的做用就是經過監聽到的nodeName獲取node信息,經過node信息獲取該node上對應的taints。而後對該node上全部的pod,依次執行processPodOnNode
方法。方法以下:ide
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// Create or Update
klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {
klog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for i := range pods {
pod := &pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
複製代碼
handlePodUpdate
經過獲取到單一的pod信息與node信息,也是最終執行processPodOnNode
方法。方法以下:oop
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
複製代碼
processPodOnNode
方法主要將須要刪除的Pod按照預約好的格式添加到taintEvictionQueue
,該queue內的任務都是設置好定時任務時間的,在相應的時間內調用deletePodHandler
方法去刪除pod,該方法位於pkg/controller/nodelifecycle/scheduler/taint_manager.go
下。方法以下:ui
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
複製代碼
因此,TaintManager的主要做用就是將須要驅逐的Pod配置好定時刪除的任務,而後從相應的Node上一一刪除。this
當開啓taintNodeByCondition特性的時候,則會調用doNoScheduleTaintingPassWorker
去對Node作NoSchedule的污點更新。調用的是doNoScheduleTaintingPass
方法。方法以下:
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable {
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: schedulerapi.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// Get exist taints of node.
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
// only NoSchedule taints are candidates to be compared with "taints" later
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == schedulerapi.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
複製代碼
doNoScheduleTaintingPass
主要作了如下工做:
一、根據nodeName獲取node信息;
二、根據node.Status.Conditions字段,判斷node是否須要添加NoSchedule污點,判斷的標準以下:
SwapNodeControllerTaint
對Node進行污點的狀態更新。
doNoExecuteTaintingPass
和doEvictionPass
二者只會執行其一。
doNoExecuteTaintingPass
方法爲Node添加
NoExecute污點;而
doEvictionPass
則是直接判斷哪些Pod須要驅逐,直接去作刪除工做。
doNoExecuteTaintingPass
方法中,經過獲取
zoneNoExecuteTainter內的數據對Node狀態進行判斷,若是須要則添加上NoExecute污點,並調用
SwapNodeControllerTaint
方法更新該Node上的污點。zoneNoExecuteTainter的信息是經過
monitorNodeHealth
方法獲取到的,後面再分析。
doNoExecuteTaintingPass
的方法以下:
doEvictionPass
則是直接經過獲取
zonePodEvictor內的數據,判斷哪些Pod須要被驅除,則直接調用Pod的DELETE接口,完成Pod的驅逐任務。zonePodEvictor的信息也是經過
monitorNodeHealth
方法獲取到的。
doEvictionPass
方法以下:
兩種方法的不一樣在於,
doNoExecuteTaintingPass
只是對Node打上污點,而
doEvictionPass
則是完成了最終的刪除工做。
doEvictionPass
的這種方式會致使某一個時間段內,大量的Pod須要被刪除,會產生很大的流量;而
doNoExecuteTaintingPass
經過給Node打上污點,讓TaintManager去作最終的Pod刪除工做,TaintManager的刪除任務是分時間段定時執行的,因此不會產生這種大流量的問題。所以建議開啓這個特性,在kube-controller-manager的啓動參數加上
--feature-gates=TaintBasedEvictions=true
便可。
前面幾個goroutine的任務主要圍繞着Taint來展開,而monitorNodeHealth
則是定時更新Node的信息,併產生數據的來源。
monitorNodeHealth
的主要任務能夠分爲如下步驟:
一、獲取全部的Node信息,按照哪些是新增的、哪些是須要刪除的以及哪些是須要從新規劃的返回節點的相應信息;
tryUpdateNodeHealth
方法;
四、根據獲取到的Node狀態,和原先的Node狀態做對比,對Node作相應的污點標記。此段代碼較長,基本結構以下
五、針對Node網絡中斷問題,根據不一樣的Node狀態配置相應的驅逐速率,調用
handleDisruption
方法。
接下來,針對每一個步驟一一分析。
首先經過List接口獲取全部的Node信息,經過classifyNodes
完成Node的劃分。classifyNodes
規則劃分很簡單,比對knownNodeSet
和allNodes
,能夠理解爲knownNodeSet
爲上一次的數據,allNodes
爲新的數據,則:
一、若是在allNodes
存在,在knownNodeSet
不存在,爲新增的Node;
二、若是在knownNodeSet
存在,在allNodes
不存在,爲刪除的Node;
三、若是在knownNodeSet
和allNodes
都存在,可是沒有zone states,爲newZoneRepresentatives的Node。每一個Node都要歸屬於一個Zone。
在步驟1完成節點的劃分以後,步驟2針對每種類型的節點作相應的處理操做。
一、待新增的Node,將其加入到knownNodeSet內緩存,經過addPodEvictorForNewZone
爲其歸屬一個Zone,經過useTaintBasedEvictions的開關控制,判斷是標記Node爲Reachable或是取消Pod的驅逐工做。總之就是表示這個Node能夠開始正常使用了;
二、待刪除的Node,將其從knownNodeSet內刪除;
三、未劃分Zone的Node,將其添加到Zone緩存中去。
對獲取到的全部的Node,調用PollImmediate
方法,每20ms,重試5次,去更新Node的狀態,主要調用了tryUpdateNodeHealth
方法。tryUpdateNodeHealth
方法值中,主要關注observedReadyCondition
和currentReadyCondition
。能夠理解爲observedReadyCondition
表示上一次的Node狀態,currentReadyCondition
表示當前的Node狀態。如下的多重if-else都是根據這兩個值來操做的。
整個大的語句從currentReadyCondition
不爲空開始,分如下幾種狀況:
一、observedReadyCondition
的值爲False,即Node未Ready,給Node打上node.kubernetes.io/not-ready:NoExecute的污點或是直接驅逐Node上的Pod;
二、observedReadyCondition
的值爲Unknown,給Node打上node.kubernetes.io/unreachable:NoExecute的污點或是直接驅除Node上的Pod;
三、observedReadyCondition
的值爲True,表示Node是正常工做的狀態,標記Node爲Reachable或是中止驅逐Pod的操做;
四、currentReadyCondition
不爲True而observedReadyCondition
爲True,表示Node處於Not Ready的狀態,標記Node爲Not Ready,並更新Node上的Pod狀態;
五、currentReadyCondition
不爲True而且配置了cloudprovider,作刪除Node的操做。
整個大的循環主要的任務就是對Node的狀態進行判斷,作Node的污點標記或是驅逐相關操做。zoneNoExecuteTainter
和zonePodEvictor
兩個數據集的信息都是在此作相應的更新的。
最終調用handleDisruption
作網絡中斷的一些相關處理操做。
中斷主要有如下幾種狀態:
handleDisruption
中,經過
allAreFullyDisrupted
和
allWasFullyDisrupted
標記如今的zone狀態和以前緩存的zone狀態,分別表示最新的結果和上一次的結果信息。而後作三種處理操做:
fullDisruption
,即全中斷,表示全部Node都處於Not Ready的狀態,此時恢復正常的驅逐速率,並中止作驅逐操做;
partialDisruption
,即部分中斷,表示部分Node處於Not Ready的狀態,此時設置用戶定義的驅逐速率;
normal
,恢復到默認的驅逐速率。