k8s爲實現容器探活worker的管理構建了一個Manager組件,該組件負責底層探活worker的管理,而且緩存當前的容器的狀態,並對外同步容器的當前狀態,今天咱們就來分析下其部分核心組件api
Manager緩存的狀態主要是會被kubelet、狀態組件消費,而且在Pod同步狀態的時候,會經過當前Manager裏面的探測狀態來更新Pod的容器的就緒與啓動狀態的更新,讓咱們一塊兒看看Manager自身的一些關鍵實現吧緩存
即prober/results/results_manager組件,其主要做用是:存儲探測結果和通知探測結果微信
cache負責容器的探測結果的保存,updates則負責對外更新狀態的訂閱,其經過新的結果和cache中的狀態進行對比,從而決定是否對外通知數據結構
// Manager implementation. type manager struct { // 保護cache sync.RWMutex // 容器ID->探測結果 cache map[kubecontainer.ContainerID]Result // 更新管道 updates chan Update }
更新緩存的時候回經過對比先後狀態來進行是否發佈變動事件,從而通知到外部訂閱容器變動的kubelet核心流程ide
func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *v1.Pod) { // 修改內部狀態 if m.setInternal(id, result) { // 同步更新事件 m.updates <- Update{id, result, pod.UID} } }
內部狀態修改與判斷是否進行同步實現源碼分析
// 若是以前的緩存不存在,或者先後狀態不一致則會返回true觸發更新 func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool { m.Lock() defer m.Unlock() prev, exists := m.cache[id] if !exists || prev != result { m.cache[id] = result return true } return false }
func (m *manager) Updates() <-chan Update { return m.updates }
探測管理器是指的prober/prober)manager的Manager組件,其負責當前kubelet上面探活組件的管理,而且進行探測狀態結果的緩存與同步,而且內部還經過statusManager來進行apiserver狀態的同步ui
每一個探測Key包含要探測的目標信息:pod的ID、容器名、探測類型設計
type probeKey struct { podUID types.UID containerName string probeType probeType }
statusManager組件在後續章節裏面會進行詳細分析,說下livenessManager該組件即探活的結果,因此當一個容器探測失敗,則會由kubelet本地先進行處理,而readlinessManager和startupManager則須要經過statusManager同步給apiserver進行同步code
type manager struct { //探測Key與worker映射 workers map[probeKey]*worker // 讀寫鎖 workerLock sync.RWMutex //statusManager緩存爲探測提供pod IP和容器id。 statusManager status.Manager // 存儲readiness探測結果 readinessManager results.Manager // 存儲liveness探測結果 livenessManager results.Manager // 存儲startup探測結果 startupManager results.Manager // 執行探測操做 prober *prober }
func (m *manager) updateStartup() { // 從管道獲取數據進行同步 update := <-m.startupManager.Updates() started := update.Result == results.Success m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) }
func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) }
func (m *manager) Start() { // Start syncing readiness. go wait.Forever(m.updateReadiness, 0) // Start syncing startup. go wait.Forever(m.updateStartup, 0) }
添加 Pod的時候會遍歷Pod的全部容器,並根據探測類型來進行對應探測worker的構建orm
func (m *manager) AddPod(pod *v1.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name // 針對startupProbe的探測任務的構建 if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) { key.probeType = startup if _, ok := m.workers[key]; ok { klog.Errorf("Startup probe already exists! %v - %v", format.Pod(pod), c.Name) return } // 構建新的worker w := newWorker(m, startup, pod, c) m.workers[key] = w go w.run() } // 針對ReadinessProbe的探測任務的構建 if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { klog.Errorf("Readiness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, readiness, pod, c) m.workers[key] = w go w.run() } // 針對LivenessProbe的探測任務的構建 if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { klog.Errorf("Liveness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, liveness, pod, c) m.workers[key] = w go w.run() } } }
更新Pod狀態主要是根據當前Manager裏面緩存的以前的狀態信息來更新Pod裏面對應容器的狀態,這些狀態是Pod裏面容器最新的探測狀態,獲取這些狀態則是檢測當前的容器是否已經就緒和啓動,爲後續更新流程作基礎數據
for i, c := range podStatus.ContainerStatuses { var ready bool // 檢測容器狀態 if c.State.Running == nil { ready = false } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { // 檢測readinessMnager裏面的狀態,若是是成功則就是已經就緒 ready = result == results.Success } else { // 檢查是否有還沒有運行的探測器。只要存在則認爲就緒 _, exists := m.getWorker(podUID, c.Name, readiness) ready = !exists } podStatus.ContainerStatuses[i].Ready = ready var started bool if c.State.Running == nil { started = false } else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) { // 容器正在運行,若是StartupProbe功能被禁用,則假定它已啓動 started = true } else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { // 若是startupManager裏面的狀態是成功的則認爲是已經啓動的 started = result == results.Success } else { // 檢查是否有還沒有運行的探測器。 _, exists := m.getWorker(podUID, c.Name, startup) started = !exists } podStatus.ContainerStatuses[i].Started = &started }
針對初始化容器主要容器已經終止而且退出的狀態碼爲0,則認爲初始化容器已經就緒
for i, c := range podStatus.InitContainerStatuses { var ready bool if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 { // 容器狀態 ready = true } podStatus.InitContainerStatuses[i].Ready = ready }
存活狀態通知主要是在kubelet的核心流程循環中進行的,若是檢測到容器的狀態失敗,會馬上進行對應pod的容器狀態的同步,從而決定下一步的操做是作什麼
case update := <-kl.livenessManager.Updates(): // 若是探測狀態失敗 if update.Result == proberesults.Failure { // 省略代碼 handler.HandlePodSyncs([]*v1.Pod{pod}) }
探活總體的設計大概就是這樣,接下來會分期其statusManager組件,即將將探測的狀態與apiserver的同步的實現, k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3
> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章
> 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈