在K8s中將Pod調度到某一臺Node節點以後,後續的狀態維護信息則是由對應機器上的kubelet進行維護,如何實時反饋本地運行狀態,並通知apiserver則是設計的難點, 本節主要是經過感知Pod狀態變化和探測狀態改變兩個流程來實際分析其核心數據結構,來了解內部設計後端
靜態Pod主要是指的那些不是經過感知apiserver建立的pod, 由於apiserver上並不包含,可是同時也須要維護和獲取這類Pod的狀態, k8s中就設計了一個鏡像Pod的概念,其實就是爲靜態Pod鏡像出來一個Pod該Pod的主要信息與靜態Pod一致,而且在apiserver中進行建立,經過apiserver能夠感知的這個鏡像Pod來反映真實的靜態Pod的狀態, api
statusManager是進行狀態同步的關鍵組件其須要綜合當前Pod運行中的數據和apiserver存儲的數據,從而決定最終的狀態轉換, 這裏先關注圖上畫出來的,更多的狀態等後續會一一介紹緩存
type versionedPodStatus struct { status v1.PodStatus // 單調遞增的版本號(每一個pod) version uint64 // Pod name & namespace, for sending updates to API server. podName string podNamespace string }
在Kubelet中爲保證與apiserver端信息的同步,在本地保存了一個Pod狀態版本信息,其裏面除了保存當前Pod的狀態數據還有一個版本版本號,經過單調遞增的版本號的對比來肯定是否進行狀態的同步數據結構
statusManager的流程其實仍是蠻複雜的,今天咱們就只講一個場景,即kubelet經過apiserver感知到一個Pod更新,而後順着該功能的數據流來進行梳理statusMangaer裏面的數據流轉app
manager中的核心狀態相關的數據結構能夠主要分爲兩大類:映射數據維護(podManager、podStatuses、apiStatusVersions)數據通訊管道(podStatusChannel), 剩餘的則是對與apiserver通訊的kublet和進行pod刪除檢查的 podDeletionSafetyide
type manager struct { kubeClient clientset.Interface // 管理緩存Pod,包含鏡像pod和靜態pod的映射 podManager kubepod.Manager // 從pod UID映射到相應pod的版本狀態信息 。 podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex podStatusChannel chan podStatusSyncRequest // 存儲鏡像pod的版本 apiStatusVersions map[kubetypes.MirrorPodUID]uint64 podDeletionSafety PodDeletionSafetyProvider }
設置Pod狀態主要是位於kubelet中的syncPod中,在接收到pod事件變動以後,會與apiserver進行 Pod最新數據的同步從而獲取當前pod在apiserver端的最新狀態ui
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() for _, c := range pod.Status.Conditions { if !kubetypes.PodConditionByKubelet(c.Type) { klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+ "But it is not owned by kubelet.", string(c.Type), format.Pod(pod)) } } // Make sure we're caching a deep copy. status = *status.DeepCopy() // 若是Pod被刪除了則須要強制與apiserver進行信息的同步 m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil) }
var oldStatus v1.PodStatus // 檢測以前的本地緩存數據 cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { oldStatus = cachedStatus.status } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { oldStatus = mirrorPod.Status } else { oldStatus = pod.Status }
檢測容器狀態主要是針對容器終止狀態轉發的合法性進行檢測,其實就是根據設定的Pod的RestartPolicy來檢測針對一個終止的容器是否能夠進行重啓spa
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil { klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err) return false } if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil { klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err) return false }
經過最新的status裏面的condition設定對應PodCondition的LastTransitionTime更新時間未當前時間線程
// Set ContainersReadyCondition.LastTransitionTime. updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady) // Set ReadyCondition.LastTransitionTime. updateLastTransitionTime(&status, &oldStatus, v1.PodReady) // Set InitializedCondition.LastTransitionTime. updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized) // Set PodScheduledCondition.LastTransitionTime. updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)
首先會根據當前容器的個數,從而決定每一個容器最大的字節數大小,而後對容器裏面的終止狀態裏面的Message信息,進行截斷,同時進行時間的校對設計
normalizeStatus(pod, &status)
若是以前已經緩存了對應的數據,而且緩存的數據與當前的狀態未發生改變,也不須要強制更新,就直接返回
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { // 若是不強制更新 ,默認是true此處不會成立 klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status) return false // No new status. }
生成最新的狀態緩存數據,而且遞增本地的版本信息
// 構建新的狀態 newStatus := versionedPodStatus{ status: status, version: cachedStatus.version + 1, // 更新器緩存 podName: pod.Name, podNamespace: pod.Namespace, } // 更新新的緩存狀態 m.podStatuses[pod.UID] = newStatus select { case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: // 構建一個新的同步請求 klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel", pod.UID, newStatus.version, newStatus.status) return true default: // Let the periodic syncBatch handle the update if the channel is full. // We can't block, since we hold the mutex lock. klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v", format.Pod(pod), status) return false }
探測狀態其實就是Pod內容器的運行狀態,好比若是設置了Readiness探測,當某個容器探測失敗的時候,就會通知對應的service從後端的enpoint中移除該Pod, 讓咱們一塊兒看看Kubelet是如何將運行狀態通知到apiserver端的
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() // 獲取本地的容器 pod, ok := m.podManager.GetPodByUID(podUID) if !ok { klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID)) return } // 獲取當前的狀態 oldStatus, found := m.podStatuses[pod.UID] if !found { klog.Warningf("Container readiness changed before pod has synced: %q - %q", format.Pod(pod), containerID.String()) return } // 獲取當前的容器狀態 containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String()) if !ok { klog.Warningf("Container readiness changed for unknown container: %q - %q", format.Pod(pod), containerID.String()) return }
// 檢測先後的就緒狀態是否發生改變 if containerStatus.Ready == ready { klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready, format.Pod(pod), containerID.String()) return }
獲取容器的狀態,修改就緒爲當前的狀態
status := *oldStatus.status.DeepCopy() containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus.Ready = ready
會根據當前運行時的容器探測的狀態,來修改對應PodCondition裏面的狀態,最後調用內部的更新邏輯
updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) { conditionIndex := -1 // 獲取Pod對應的PodCondition狀態 for i, condition := range status.Conditions { if condition.Type == conditionType { conditionIndex = i break } } // 修改或追加Pod對應的PodCondition狀態 if conditionIndex != -1 { status.Conditions[conditionIndex] = condition } else { klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status) status.Conditions = append(status.Conditions, condition) } } // 計算Ready狀態 updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase)) // 計算容器Ready狀態 updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)) m.updateStatusInternal(pod, status, false)
statusManager會啓動一個後臺的線程來進行更新管道里面同步請求的消費
func (m *manager) Start() { // 省略非核心代碼 go wait.Forever(func() { select { case syncRequest := <-m.podStatusChannel: // 獲取最新的狀態信息,更新apiserver klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel", syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) m.syncPod(syncRequest.podUID, syncRequest.status) case <-syncTicker: m.syncBatch() } }, 0) }
同步條件檢測主要是檢測鏡像Pod的版本是否發送變化、Pod當前是否被刪除,若是pod沒有被刪除則返回false,即對一個沒有刪除的Pod咱們仍是須要繼續更新其狀態的
if !m.needsUpdate(uid, status) { klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid) return }
若是沒有獲取到Pod信息,則直接進行退出便可
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{}) if errors.IsNotFound(err) { klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid)) // 若是Pod已經被刪除了,就直接退出就行 return } if err != nil { klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err) return }
這裏面會經過將最小的狀態與以前的狀態來進行merge合併,而後調用kubeClient進行apiserver端狀態的修改
oldStatus := pod.Status.DeepCopy() // 更新服務端的狀態 newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) if err != nil { klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) return }
// 當前是最新的狀態 pod = newPod klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status) m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version
這裏主要是最後階段,即Pod對應的資源都已經釋放了,則才最終刪除apiserver端的Pod
// 若是pod的DeletionTimestamp被設置,則對應的Pod須要被刪除 if m.canBeDeleted(pod, status.status) { deleteOptions := metav1.NewDeleteOptions(0) deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID)) // 調用apiserver對Pod進行刪除 err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions) if err != nil { klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err) return } klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod)) m.deletePodStatus(uid) }
探活總體的設計大概就是這樣,但願大佬們多多關注,一塊兒交流。
k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3