圖解kubernetes容器探活機制核心實現

在k8s中經過kubelet拉起一個容器以後,用戶能夠指定探活的方式用於實現容器的健康性檢查,目前支持TCP、Http和命令三種方式,今天介紹其整個探活模塊的實現, 瞭解其週期性探測、計數器、延遲等設計的具體實現docker

1. 探活的總體設計

1.1 線程模型

image.png探活的線程模型設計相對簡單一些,其經過worker來進行底層探活任務的執行,並經過Manager來負責worker的管理, 同時緩存探活的結果api

1.2 週期性探活

image.png根據每一個探活任務的週期,來生成定時器,則只須要監聽定時器事件便可緩存

1.3 探活機制的實現

image.png探活機制的實現除了命令Http和Tcp都相對簡單,Tcp只須要直接經過net.DialTimeout連接便可,而Http則是經過構建一個http.Transport構造Http請求執行Do操做便可微信

相對複雜的則是exec, 其首先要根據當前container的環境變量生成command,而後經過容器、命令、超時時間等構建一個Command最後纔是調用runtimeService調用csi執行命令 dom

2.探活接口實現

2.1 核心成員結構

type prober struct {
    exec execprobe.Prober
    // 咱們能夠看到針對readiness/liveness會分別啓動一個http Transport來進行連接
    readinessHTTP httpprobe.Prober
    livenessHTTP  httpprobe.Prober
    startupHTTP   httpprobe.Prober
    tcp           tcpprobe.Prober
    runner        kubecontainer.ContainerCommandRunner

    // refManager主要是用於獲取成員的引用對象
    refManager *kubecontainer.RefManager
    // recorder會負責探測結果事件的構建,並最終傳遞迴 apiserver
    recorder   record.EventRecorder
}複製代碼

2.2 探活主流程

探活的主流程主要是位於prober的probe方法中,其核心流程分爲三段tcp

2.2.1 獲取探活的目標配置

func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *v1.Probe
// 根據探活的類型來獲取對應位置的探活配置
    switch probeType {
    case readiness:
        probeSpec = container.ReadinessProbe
    case liveness:
        probeSpec = container.LivenessProbe
    case startup:
        probeSpec = container.StartupProbe
    default:
        return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
    }複製代碼

2.2.2 執行探活記錄錯誤信息

若是返回的錯誤,或者不是成功或者警告的狀態,則會獲取對應的引用對象,而後經過 recorder進行事件的構造,發送結果返回apiserveride

// 執行探活流程    
result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
    
    if err != nil || (result != probe.Success && result != probe.Warning) {
        // // 若是返回的錯誤,或者不是成功或者警告的狀態
        // 則會獲取對應的引用對象,而後經過 
        ref, hasRef := pb.refManager.GetRef(containerID)
        if !hasRef {
            klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
        }
        if err != nil {
            klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
            recorder進行事件的構造,發送結果返回apiserver
            if hasRef {
                pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
            }
        } else { // result != probe.Success
            klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
            // recorder進行事件的構造,發送結果返回apiserver
            if hasRef {
                pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
            }
        }
        return results.Failure, err
    }複製代碼

2.2.3 探活重試實現

func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
    var err error
    var result probe.Result
    var output string
    for i := 0; i < retries; i++ {
        result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
        if err == nil {
            return result, output, nil
        }
    }
    return result, output, err
}複製代碼

2.2.4 根據探活類型執行探活

func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
    timeout := time.Duration(p.TimeoutSeconds) * time.Second
    if p.Exec != nil {
        klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
        command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
        return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
    }
    if p.HTTPGet != nil {
        // 獲取協議類型與 http參數信息
        scheme := strings.ToLower(string(p.HTTPGet.Scheme))
        host := p.HTTPGet.Host
        if host == "" {
            host = status.PodIP
        }
        port, err := extractPort(p.HTTPGet.Port, container)
        if err != nil {
            return probe.Unknown, "", err
        }
        path := p.HTTPGet.Path
        klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
        url := formatURL(scheme, host, port, path)
        headers := buildHeader(p.HTTPGet.HTTPHeaders)
        klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
        switch probeType {
        case liveness:
            return pb.livenessHTTP.Probe(url, headers, timeout)
        case startup:
            return pb.startupHTTP.Probe(url, headers, timeout)
        default:
            return pb.readinessHTTP.Probe(url, headers, timeout)
        }
    }
    if p.TCPSocket != nil {
        port, err := extractPort(p.TCPSocket.Port, container)
        if err != nil {
            return probe.Unknown, "", err
        }
        host := p.TCPSocket.Host
        if host == "" {
            host = status.PodIP
        }
        klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
        return pb.tcp.Probe(host, port, timeout)
    }
    klog.Warningf("Failed to find probe builder for container: %v", container)
    return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}複製代碼

3. worker工做線程

Worker工做線程執行探測,要考慮幾個問題:1.容器剛啓動的時候可能須要等待一段時間,好比應用程序可能要作一些初始化的工做,尚未準備好2.若是發現容器探測失敗後從新啓動,則在啓動以前重複的探測也是沒有意義的3.不管是成功或者失敗,可能須要一些閾值來進行輔助,避免單次小几率失敗,重啓容器oop

3.1 核心成員 

其中關鍵參數除了探測配置相關,則主要是onHold參數,該參數用於決定是否延緩對容器的探測,即當容器重啓的時候,須要延緩探測,resultRun則是一個計數器,不管是連續成功或者連續失敗,都經過該計數器累加,後續會判斷是否超過給定閾值源碼分析

type worker struct {
    // 中止channel
    stopCh chan struct{}

    // 包含探針的pod
    pod *v1.Pod

    // 容器探針
    container v1.Container

    // 探針配置
    spec *v1.Probe

    // 探針類型
    probeType probeType

    // The probe value during the initial delay.
    initialValue results.Result

    // 存儲探測結果
    resultsManager results.Manager
    probeManager   *manager

    // 此工做進程的最後一個已知容器ID。
    containerID kubecontainer.ContainerID
    // 最後一次探測結果
    lastResult results.Result
    // 探測連續返回相同結果的此時
    resultRun int

    // 探測失敗會設置爲true不會進行探測 
    onHold bool

    // proberResultsMetricLabels holds the labels attached to this worker
    // for the ProberResults metric by result.
    proberResultsSuccessfulMetricLabels metrics.Labels
    proberResultsFailedMetricLabels     metrics.Labels
    proberResultsUnknownMetricLabels    metrics.Labels
}複製代碼

3.2 探測實現核心流程

image.png

3.2.1 失敗容器探測中斷

若是當前容器的狀態已經被終止了,則就不須要對其進行探測了,直接返回便可ui

// 獲取當前worker對應pod的狀態
    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
    if !ok {
        // Either the pod has not been created yet, or it was already deleted.
        klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
        return true
    }
    // 若是pod終止worker應該終止
    if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
        klog.V(3).Infof("Pod %v %v, exiting probe worker",
            format.Pod(w.pod), status.Phase)
        return false
    }複製代碼

3.2.2 延緩探測恢復

延緩探測恢復主要是指的在發生探測失敗的狀況下,會進行重啓操做,在此期間不會進行探測,恢復的邏輯則是經過判斷對應容器的id是否改變,經過修改onHold實現

// 經過容器名字獲取最新的容器信息    
c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) == 0 {
        // Either the container has not been created yet, or it was deleted.
        klog.V(3).Infof("Probe target container not found: %v - %v",
            format.Pod(w.pod), w.container.Name)
        return true // Wait for more information.
    }

    if w.containerID.String() != c.ContainerID {
        // 若是容器改變,則代表從新啓動了一個容器
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }
        w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
        // 獲取到一個新的容器,則就須要從新開啓探測 
        w.onHold = false
    }

    if w.onHold {
        //若是當前設置延緩狀態爲true,則不進行探測
        return true
    }複製代碼

3.2.3 初始化延遲探測

初始化延遲探測主要是指的容器的Running的運行時間小於配置的InitialDelaySeconds則直接返回

if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }複製代碼

3.2.4 執行探測邏輯

result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
    if err != nil {
        // Prober error, throw away the result.
        return true
    }

    switch result {
    case results.Success:
        ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
    case results.Failure:
        ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
    default:
        ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
    }複製代碼

3.2.5 累加探測計數

在累加探測計數以後,會判斷累加後的計數是否超過設定的閾值,若是未超過則不進行狀態變動

if w.lastResult == result {
        w.resultRun++
    } else {
        w.lastResult = result
        w.resultRun = 1
    }

    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
        (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
        // Success or failure is below threshold - leave the probe state unchanged.
        // 成功或失敗低於閾值-保持探測器狀態不變。
        return true
    }複製代碼

3.2.6 修改探測狀態

若是探測狀態發送改變,則須要先進行狀態的保存,同時若是是探測失敗,則須要修改onHOld狀態爲true即延緩探測,同時將計數器歸0

// 這裏會修改對應的狀態信息    
w.resultsManager.Set(w.containerID, result, w.pod)

    if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
        // 容器運行liveness/starup檢測失敗,他們須要重啓, 中止探測,直到有新的containerID
        // 這是爲了減小命中#21751的機會,其中在容器中止時運行 docker exec可能會致使容器狀態損壞
        w.onHold = true
        w.resultRun = 0
    }複製代碼

3.3 探測主循環流程

主流程就很簡答了執行上面的探測流程

func (w *worker) run() {
    // 根據探活週期來構建定時器
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

    // If kubelet restarted the probes could be started in rapid succession.
    // Let the worker wait for a random portion of tickerPeriod before probing.
    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

    probeTicker := time.NewTicker(probeTickerPeriod)

    defer func() {
        // Clean up.
        probeTicker.Stop()
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }

        w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
        ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
        ProberResults.Delete(w.proberResultsFailedMetricLabels)
        ProberResults.Delete(w.proberResultsUnknownMetricLabels)
    }()

probeLoop:
    for w.doProbe() {
        // Wait for next probe tick.
        select {
        case <-w.stopCh:
            break probeLoop
        case <-probeTicker.C:
            // continue
        }
    }
}
複製代碼

今天就先到這裏面,明天再聊proberManager的實現,你們分享轉發,就算對個人支持了,動動手就緒

微信號:baxiaoshi2020

關注公告號閱讀更多源碼分析文章 21天大棚

更多文章關注 www.sreguide.com

本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索