在Kubernetes 中,系統和應用程序的健康檢查任務是由 kubelet 來完成的,本文主要討論kubelet中 probemanager 相關的實現原理。golang
若是你對k8s的各類probe如何使用還不瞭解,能夠看下我以前寫的這篇K8S 中的健康檢查機制,是從實踐的角度介紹的。api
在 kubelet 初始化的時候,會建立 statusManager 和 probeManager,這兩個都是和 pod 狀態相關的邏輯,在kubelet 原理解析一:pod管理文章中有提到,statusManager 負責維護狀態信息,並把Pod狀態及時更新到Api-Server,緩存
可是它並不負責監控 pod 狀態的變化,而是提供對應的接口供其餘組件調用,好比 probeManager。probeManager 會定時去監控 pod 中容器的健康情況,一旦發現狀態發生變化,就調用 statusManager 提供的方法更新 pod 的狀態。數據結構
klet.statusManager = status.NewManager(kubeClient, klet.podManager) klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, klet.runner, containerRefManager, kubeDeps.Recorder)
statusManager代碼位於:pkg/kubelet/status/status_manager.go併發
type PodStatusProvider interface { GetPodStatus(uid types.UID) (api.PodStatus, bool) } type Manager interface { PodStatusProvider Start() SetPodStatus(pod *api.Pod, status api.PodStatus) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) TerminatePod(pod *api.Pod) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) } SetPodStatus:若是 pod 的狀態發生了變化,會調用這個方法,把新狀態更新到 apiserver,通常在 kubelet 維護 pod 生命週期的時候會調用 SetContainerReadiness:若是健康檢查發現 pod 中容器的健康狀態發生變化,會調用這個方法,修改 pod 的健康狀態 TerminatePod:kubelet 在刪除 pod 的時候,會調用這個方法,把 pod 中全部的容器設置爲 terminated 狀態 RemoveOrphanedStatuses:刪除孤兒 pod,直接把對應的狀態數據從緩存中刪除便可
Start() 方法是在 kubelet 運行的時候調用的,它會啓動一個 goroutine 執行更新操做:app
const syncPeriod = 10 * time.Second func (m *manager) Start() { ...... glog.Info("Starting to sync pod status with apiserver") syncTicker := time.Tick(syncPeriod) // syncPod and syncBatch share the same go routine to avoid sync races. go wait.Forever(func() { select { case syncRequest := <-m.podStatusChannel: m.syncPod(syncRequest.podUID, syncRequest.status) case <-syncTicker: m.syncBatch() } }, 0) }
這個 goroutine 就能不斷地從兩個 channel 監聽數據進行處理:syncTicker 是個定時器,也就是說它會定時保證 apiserver 和本身緩存的最新 pod 狀態保持一致;podStatusChannel 是全部 pod 狀態更新發送到的地方,調用方不會直接操做這個 channel,而是經過調用上面提到的修改狀態的各類方法,這些方法內部會往這個 channel 寫數據。負載均衡
m.syncPod 根據參數中的 pod 和它的狀態信息對 apiserver 中的數據進行更新,若是發現 pod 已經被刪除也會把它從內部數據結構中刪除。異步
probeManager負責 檢測 pod 中容器的健康狀態,目前有三種 probe:socket
並非全部的 pod 中的容器都有健康檢查的探針,若是沒有,則不對容器進行檢測,默認認爲容器是正常的。在每次建立新 pod 的時候,kubelet 都會調用 probeManager.AddPod(pod) 方法,它對應的實如今 pkg/kubelet/prober/prober_manager.go 文件中:tcp
func (m *manager) AddPod(pod *v1.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { klog.Errorf("Readiness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, readiness, pod, c) m.workers[key] = w go w.run() } if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { klog.Errorf("Liveness probe already exists! %v - %v", format.Pod(pod), c.Name) return } w := newWorker(m, liveness, pod, c) m.workers[key] = w go w.run() } } }
在這個方法裏,kubelet 會遍歷pod 中全部的 container,若是配置了 probe,就建立一個 worker,並異步處理此次探測
// Creates and starts a new probe worker. func newWorker( m *manager, probeType probeType, pod *v1.Pod, container v1.Container) *worker { w := &worker{ stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. pod: pod, container: container, probeType: probeType, probeManager: m, } switch probeType { case readiness: w.spec = container.ReadinessProbe w.resultsManager = m.readinessManager w.initialValue = results.Failure case liveness: w.spec = container.LivenessProbe w.resultsManager = m.livenessManager w.initialValue = results.Success } w.proberResultsMetricLabels = prometheus.Labels{ "probe_type": w.probeType.String(), "container_name": w.container.Name, "pod_name": w.pod.Name, "namespace": w.pod.Namespace, "pod_uid": string(w.pod.UID), } return w }
worker 開始run以後,會調用doProbe方法
func (w *worker) doProbe() (keepGoing bool) { defer func() { recover() }() defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) // pod 沒有被建立,或者已經被刪除了,直接跳過檢測,可是會繼續檢測 status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) if !ok { glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod)) return true } // pod 已經退出(無論是成功仍是失敗),直接返回,並終止 worker if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { glog.V(3).Infof("Pod %v %v, exiting probe worker", format.Pod(w.pod), status.Phase) return false } // 容器沒有建立,或者已經刪除了,直接返回,並繼續檢測,等待更多的信息 c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name) if !ok || len(c.ContainerID) == 0 { glog.V(3).Infof("Probe target container not found: %v - %v", format.Pod(w.pod), w.container.Name) return true } // pod 更新了容器,使用最新的容器信息 if w.containerID.String() != c.ContainerID { if !w.containerID.IsEmpty() { w.resultsManager.Remove(w.containerID) } w.containerID = kubecontainer.ParseContainerID(c.ContainerID) w.resultsManager.Set(w.containerID, w.initialValue, w.pod) w.onHold = false } if w.onHold { return true } if c.State.Running == nil { glog.V(3).Infof("Non-running container probed: %v - %v", format.Pod(w.pod), w.container.Name) if !w.containerID.IsEmpty() { w.resultsManager.Set(w.containerID, results.Failure, w.pod) } // 容器失敗退出,而且不會再重啓,終止 worker return c.State.Terminated == nil || w.pod.Spec.RestartPolicy != api.RestartPolicyNever } // 容器啓動時間過短,沒有超過配置的初始化等待時間 InitialDelaySeconds if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { return true } // 調用 prober 進行檢測容器的狀態 result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID) if err != nil { return true } if w.lastResult == result { w.resultRun++ } else { w.lastResult = result w.resultRun = 1 } // 若是容器退出,而且沒有超過最大的失敗次數,則繼續檢測 if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) || (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) { return true } // 保存最新的檢測結果 w.resultsManager.Set(w.containerID, result, w.pod) if w.probeType == liveness && result == results.Failure { // 容器 liveness 檢測失敗,須要刪除容器並從新建立,在新容器成功建立出來以前,暫停檢測 w.onHold = true } return true }
liveness檢測結果會存放在resultsManager,它把結果保存在緩存中,併發送到 m.updates 管道。而管道消費者是 kubelet 中的主循環syncLoopIteration。
case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { // The liveness manager detected a failure; sync the pod. pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) break } glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod)) handler.HandlePodSyncs([]*api.Pod{pod}) }
liveness檢測若是不經過,pod就會重啓,由 kubelet 的 sync 循環處理便可。但 readness檢測失敗不能重啓 pod,所以readness的邏輯是:
func (m *manager) updateReadiness() { update := <-m.readinessManager.Updates() ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) }
proberManager 啓動的時候,會運行一個 goroutine 定時讀取 readinessManager 管道中的數據,並根據數據調用 statusManager 去更新 apiserver 中 pod 的狀態信息。
負責 Service 邏輯的組件獲取到了這個狀態,就能根據不一樣的值來決定是否須要更新 endpoints 的內容,也就是 service 的請求是否發送到這個 pod。
上面是 probemanager 的主要邏輯,咱們接下來看下真正執行探測任務的 probe方法
// probe probes the container. func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) { var probeSpec *v1.Probe switch probeType { case readiness: probeSpec = container.ReadinessProbe case liveness: probeSpec = container.LivenessProbe default: return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType) } ... result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries) ...
probe主方法調用pb.runProbeWithRetries 方法,傳入containerid、類型、重試次數等。
調用runtimeService的ExecSync方法進入容器執行命令,回收結果,若是退出碼爲 0 ,就認爲探測成功。
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env) return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout)) .... func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd { return execInContainer{func() ([]byte, error) { return pb.runner.RunInContainer(containerID, cmd, timeout) }} } ... func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) { stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout) return append(stdout, stderr...), err } func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) { data, err := e.CombinedOutput() klog.V(4).Infof("Exec probe response: %q", string(data)) if err != nil { exit, ok := err.(exec.ExitError) if ok { if exit.ExitStatus() == 0 { return probe.Success, string(data), nil } return probe.Failure, string(data), nil } return probe.Unknown, "", err } return probe.Success, string(data), nil }
標準的 http 探測模板,若是400 > code >= 200,則認爲成功。不支持 https
func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) { req, err := http.NewRequest("GET", url.String(), nil) if err != nil { // Convert errors into failures to catch timeouts. return probe.Failure, err.Error(), nil } if _, ok := headers["User-Agent"]; !ok { if headers == nil { headers = http.Header{} } // explicitly set User-Agent so it's not set to default Go value v := version.Get() headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)) } req.Header = headers if headers.Get("Host") != "" { req.Host = headers.Get("Host") } res, err := client.Do(req) if err != nil { // Convert errors into failures to catch timeouts. return probe.Failure, err.Error(), nil } defer res.Body.Close() b, err := ioutil.ReadAll(res.Body) if err != nil { return probe.Failure, "", err } body := string(b) if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest { klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res) return probe.Success, body, nil } klog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body) return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil }
gRPC或FTP服務通常會使用 TCP 探測,嘗試在指定端口上創建TCP鏈接。
若是socket鏈接能成功,則返回成功。
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { conn, err := net.DialTimeout("tcp", addr, timeout) if err != nil { // Convert errors to failures to handle timeouts. return probe.Failure, err.Error(), nil } err = conn.Close() if err != nil { klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err) } return probe.Success, "", nil }