一文讀懂 SuperEdge 分佈式健康檢查 (邊端)

做者:杜楊浩,騰訊雲高級工程師,熱衷於開源、容器和Kubernetes。目前主要從事鏡像倉庫、Kubernetes集羣高可用&備份還原,以及邊緣計算相關研發工做。node

前言

SuperEdge 是基於原生 Kubernetes 的邊緣容器管理系統。該系統把雲原生能力擴展到邊緣側,很好的實現了雲端對邊緣端的管理和控制,極大簡化了應用從雲端部署到邊緣端的過程。同時SuperEdge設計了分佈式健康檢查機制規避了雲邊網絡不穩定形成的大量pod遷移和重建,保證了服務的穩定。git

邊緣計算場景下,邊緣節點與雲端的網絡環境十分複雜,鏈接並不可靠,在原生 Kubernetes 集羣中,會形成 apiserver 和節點鏈接的中斷,節點狀態的異常,最終致使pod的驅逐和 endpoint 的缺失,形成服務的中斷和波動,具體來講原生 Kubernetes 處理以下:github

  • 失聯的節點被置爲 ConditionUnknown 狀態,並被添加 NoSchedule 和 NoExecute 的 taints
  • 失聯的節點上的 pod 被驅逐,並在其餘節點上進行重建
  • 失聯的節點上的 pod 從 Service 的 Endpoint 列表中移除

所以,邊緣計算場景僅僅依賴邊端和 apiserver 的鏈接狀況是不足以判斷節點是否異常的,會由於網絡的不可靠形成誤判,影響正常服務。而相較於雲端和邊緣端的鏈接,顯然邊端節點之間的鏈接更爲穩定,具備更高的參考價值,所以 superedge 提出了邊緣分佈式健康檢查機制。該機制中節點狀態斷定除了要考慮 apiserver 的因素外,還引入了節點的評估因素,進而對節點進行更爲全面的狀態判斷。經過這個功能,可以避免因爲雲邊網絡不可靠形成的大量的pod遷移和重建,保證服務的穩定web

具體來講,主要經過以下三個層面加強節點狀態判斷的準確性:json

  • 每一個節點按期探測其餘節點健康狀態
  • 集羣內全部節點按期投票決定各節點的狀態
  • 雲端和邊端節點共同決定節點狀態

而分佈式健康檢查最終的判斷處理以下:api

edge-health-daemon 源碼分析

在深刻源碼以前先介紹一下分佈式健康檢查的實現原理,其架構圖以下所示:網絡

Kubernetes 每一個 node 在 kube-node-lease namespace 下會對應一個 Lease object,kubelet 每隔 node-status-update-frequency 時間(默認10s)會更新對應node的 Lease object數據結構

node-controller 會每隔 node-monitor-period 時間(默認5s)檢查 Lease object 是否更新,若是超過 node-monitor-grace-period 時間(默認40s)沒有發生過更新,則認爲這個 node 不健康,會更新 NodeStatus(ConditionUnknown)架構

而當節點心跳超時(ConditionUnknown)以後,node controller 會給該 node 添加以下 taints:併發

spec:
  ...
  taints:
  - effect: NoSchedule
    key: node.kubernetes.io/unreachable
    timeAdded: "2020-07-02T03:50:47Z"
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    timeAdded: "2020-07-02T03:50:53Z"

同時,endpoint controller 會從 endpoint backend 中踢掉該母機上的全部 pod

對於打上 NoSchedule taint 的母機,Scheduler 不會調度新的負載在該 node 上了;而對於打上 NoExecute(node.kubernetes.io/unreachable) taint 的母機,node controller 會在節點心跳超時以後一段時間(默認5mins)驅逐該節點上的 pod

分佈式健康檢查邊端的 edge-health-daemon 組件會對同區域邊緣節點執行分佈式健康檢查,並向 apiserver 發送健康狀態投票結果(給 node 打 annotation)

此外,爲了實如今雲邊斷連且分佈式健康檢查狀態正常的狀況下:

  • 失聯的節點上的 pod 不會從 Service 的 Endpoint 列表中移除
  • 失聯的節點上的 pod 不會被驅逐

還須要在雲端運行 edge-health-admission(Kubernetes mutating admission webhook),不斷根據 node edge-health annotation 調整 kube-controller-manager 設置的 node taint(去掉NoExecute taint)以及 endpoints (將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現雲端和邊端共同決定節點狀態

本章將主要介紹 edge-health-daemon 原理,以下爲 edge-health-daemon 的相關數據結構:

type EdgeHealthMetadata struct {
    *NodeMetadata
    *CheckMetadata
}
type NodeMetadata struct {
    NodeList []v1.Node
    sync.RWMutex
}
type CheckMetadata struct {
    CheckInfo            map[string]map[string]CheckDetail // Checker ip:{Checked ip:Check detail}
    CheckPluginScoreInfo map[string]map[string]float64     // Checked ip:{Plugin name:Check score}
    sync.RWMutex
}
type CheckDetail struct {
    Normal bool
    Time   time.Time
}
type CommunInfo struct {
    SourceIP    string                 // ClientIP,Checker ip
    CheckDetail map[string]CheckDetail // Checked ip:Check detail
    Hmac        string
}

含義以下:

  • NodeMetadata:爲了實現分區域分佈式健康檢查機制而維護的邊緣節點 cache,其中包含該區域內的全部邊緣節點列表 NodeList
  • CheckMetadata:存放健康檢查的結果,具體來講包括兩個數據結構:
    • CheckPluginScoreInfo:爲Checked ip:{Plugin name:Check score}組織形式。第一級 key 表示:被檢查的ip;第二級 key 表示:檢查插件的名稱;value 表示:檢查分數
    • CheckInfo:爲Checker ip:{Checked ip:Check detail}組織形式。第一級key表示:執行檢查的ip;第二級key表示:被檢查的ip;value表示檢查結果 CheckDetail
  • CheckDetail:表明健康檢查的結果
    • Normal:Normal 爲 true 表示檢查結果正常;false 表示異常
    • Time:表示得出該結果時的時間,用於結果有效性的判斷(超過一段時間沒有更新的結果將無效)
  • CommunInfo:邊緣節點向其它節點發送健康檢查結果時使用的數據,其中包括:
    • SourceIP:表示執行檢查的ip
    • CheckDetail:爲Checked ip:Check detail組織形式,包含被檢查的ip以及檢查結果
    • Hmac:SourceIP 以及 CheckDetail 進行 hmac 獲得,用於邊緣節點通訊過程當中判斷傳輸數據的有效性(是否被篡改)

edge-health-daemon 主體邏輯包括四部分功能:

  • SyncNodeList:根據邊緣節點所在的 zone 刷新 node cache,同時更新 CheckMetadata相關數據
  • ExecuteCheck:對每一個邊緣節點執行若干種類的健康檢查插件(ping,kubelet等),並將各插件檢查分數彙總,根據用戶設置的基準線得出節點是否健康的結果
  • Commun:將本節點對其它各節點健康檢查的結果發送給其它節點
  • Vote:對全部節點健康檢查的結果分類,若是某個節點被大多數(>1/2)節點斷定爲正常,則對該節點添加superedgehealth/node-health:true annotation,代表該節點分佈式健康檢查結果爲正常;不然,對該節點添加superedgehealth/node-health:false annotation,代表該節點分佈式健康檢查結果爲異常

下面依次對上述功能進行源碼分析:

一、SyncNodeList

SyncNodeList 每隔 HealthCheckPeriod 秒(health-check-period 選項)執行一次,會按照以下狀況分類刷新 node cache:

  • 若是 kube-system namespace 下不存在名爲 edge-health-zone-config的configmap,則沒有開啓多地域探測,所以會獲取全部邊緣節點列表並刷新 node cache
  • 不然,若是 edge-health-zone-config 的 configmap 數據部分 TaintZoneAdmission 爲 false,則沒有開啓多地域探測,所以會獲取全部邊緣節點列表並刷新 node cache
  • 若是 TaintZoneAdmission 爲 true,且 node 有"superedgehealth/topology-zone"標籤(標示區域),則獲取"superedgehealth/topology-zone" label value 相同的節點列表並刷新 node cache
  • 若是 node 沒有"superedgehealth/topology-zone" label,則只會將邊緣節點自己添加到分佈式健康檢查節點列表中並刷新 node cache(only itself)
func (ehd *EdgeHealthDaemon) SyncNodeList() {
    // Only sync nodes when self-located found
    var host *v1.Node
    if host = ehd.metadata.GetNodeByName(ehd.cfg.Node.HostName); host == nil {
        klog.Errorf("Self-hostname %s not found", ehd.cfg.Node.HostName)
        return
    }
    // Filter cloud nodes and retain edge ones
    masterRequirement, err := labels.NewRequirement(common.MasterLabel, selection.DoesNotExist, []string{})
    if err != nil {
        klog.Errorf("New masterRequirement failed %+v", err)
        return
    }
    masterSelector := labels.NewSelector()
    masterSelector = masterSelector.Add(*masterRequirement)
    if mrc, err := ehd.cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.TaintZoneConfigMap); err != nil {
        if apierrors.IsNotFound(err) { // multi-region configmap not found
            if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {
                klog.Errorf("Multi-region configmap not found and get nodes err %+v", err)
                return
            } else {
                ehd.metadata.SetByNodeList(NodeList)
            }
        } else {
            klog.Errorf("Get multi-region configmap err %+v", err)
            return
        }
    } else { // multi-region configmap found
        mrcv := mrc.Data[common.TaintZoneConfigMapKey]
        klog.V(4).Infof("Multi-region value is %s", mrcv)
        if mrcv == "false" { // close multi-region check
            if NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {
                klog.Errorf("Multi-region configmap exist but disabled and get nodes err %+v", err)
                return
            } else {
                ehd.metadata.SetByNodeList(NodeList)
            }
        } else { // open multi-region check
            if hostZone, existed := host.Labels[common.TopologyZone]; existed {
                klog.V(4).Infof("Host %s has HostZone %s", host.Name, hostZone)
                zoneRequirement, err := labels.NewRequirement(common.TopologyZone, selection.Equals, []string{hostZone})
                if err != nil {
                    klog.Errorf("New masterZoneRequirement failed: %+v", err)
                    return
                }
                masterZoneSelector := labels.NewSelector()
                masterZoneSelector = masterZoneSelector.Add(*masterRequirement, *zoneRequirement)
                if nodeList, err := ehd.nodeLister.List(masterZoneSelector); err != nil {
                    klog.Errorf("TopologyZone label for hostname %s but get nodes err: %+v", host.Name, err)
                    return
                } else {
                    ehd.metadata.SetByNodeList(nodeList)
                }
            } else { // Only check itself if there is no TopologyZone label
                klog.V(4).Infof("Only check itself since there is no TopologyZone label for hostname %s", host.Name)
                ehd.metadata.SetByNodeList([]*v1.Node{host})
            }
        }
    }
    // Init check plugin score
    ipList := make(map[string]struct{})
    for _, node := range ehd.metadata.Copy() {
        for _, addr := range node.Status.Addresses {
            if addr.Type == v1.NodeInternalIP {
                ipList[addr.Address] = struct{}{}
                ehd.metadata.InitCheckPluginScore(addr.Address)
            }
        }
    }
    // Delete redundant check plugin score
    for _, checkedIp := range ehd.metadata.CopyCheckedIp() {
        if _, existed := ipList[checkedIp]; !existed {
            ehd.metadata.DeleteCheckPluginScore(checkedIp)
        }
    }
    // Delete redundant check info
    for checkerIp := range ehd.metadata.CopyAll() {
        if _, existed := ipList[checkerIp]; !existed {
            ehd.metadata.DeleteByIp(ehd.cfg.Node.LocalIp, checkerIp)
        }
    }
    klog.V(4).Infof("SyncNodeList check info %+v successfully", ehd.metadata)
}
...
func (cm *CheckMetadata) DeleteByIp(localIp, ip string) {
    cm.Lock()
    defer cm.Unlock()
    delete(cm.CheckInfo[localIp], ip)
    delete(cm.CheckInfo, ip)
}

在按照如上邏輯更新node cache以後,會初始化CheckMetadata.CheckPluginScoreInfo,將節點ip賦值給CheckPluginScoreInfo key(Checked ip:被檢查的ip)

另外,會刪除CheckMetadata.CheckPluginScoreInfo以及CheckMetadata.CheckInfo中多餘的items(不屬於該邊緣節點檢查範圍)

二、ExecuteCheck

ExecuteCheck也是每隔HealthCheckPeriod秒(health-check-period選項)執行一次,會對每一個邊緣節點執行若干種類的健康檢查插件(ping,kubelet等),並將各插件檢查分數彙總,根據用戶設置的基準線HealthCheckScoreLine(health-check-scoreline選項)得出節點是否健康的結果

func (ehd *EdgeHealthDaemon) ExecuteCheck() {
    util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {
        ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)
    })
    klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)
    for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {
        totalScore := 0.0
        for _, score := range pluginScores {
            totalScore += score
        }
        if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {
            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})
        } else {
            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})
        }
    }
    klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}

這裏會調用 ParallelizeUntil 併發執行各檢查插件,edge-health 目前支持 ping 以及 kubelet 兩種檢查插件,在 checkplugin 目錄(github.com/superedge/superedge/pkg/edge-health/checkplugin),經過 Register 註冊到 PluginInfo 單例(plugin列表)中,以下:

// TODO: handle flag parse errors
func (pcp *PingCheckPlugin) Set(s string) error {
    var err error
    for _, para := range strings.Split(s, ",") {
        if len(para) == 0 {
            continue
        }
        arr := strings.Split(para, "=")
        trimKey := strings.TrimSpace(arr[0])
        switch trimKey {
        case "timeout":
            timeout, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
            pcp.HealthCheckoutTimeOut = timeout
        case "retries":
            retries, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
            pcp.HealthCheckRetries = retries
        case "weight":
            weight, _ := strconv.ParseFloat(strings.TrimSpace(arr[1]), 64)
            pcp.Weight = weight
        case "port":
            port, _ := strconv.Atoi(strings.TrimSpace(arr[1]))
            pcp.Port = port
        }
    }
    PluginInfo = NewPlugin()
    PluginInfo.Register(pcp)
    return err
}
func (p *Plugin) Register(plugin CheckPlugin) {
    p.Plugins = append(p.Plugins, plugin)
    klog.V(4).Info("Register check plugin: %+v", plugin)
}
...
var (
    PluginOnce sync.Once
    PluginInfo Plugin
)
type Plugin struct {
    Plugins []CheckPlugin
}
func NewPlugin() Plugin {
    PluginOnce.Do(func() {
        PluginInfo = Plugin{
            Plugins: []CheckPlugin{},
        }
    })
    return PluginInfo
}

每種插件具體執行健康檢查的邏輯封裝在 CheckExecute 中,這裏以 ping plugin 爲例:

// github.com/superedge/superedge/pkg/edge-health/checkplugin/pingcheck.go
func (pcp *PingCheckPlugin) CheckExecute(checkMetadata *metadata.CheckMetadata) {
    copyCheckedIp := checkMetadata.CopyCheckedIp()
    util.ParallelizeUntil(context.TODO(), 16, len(copyCheckedIp), func(index int) {
        checkedIp := copyCheckedIp[index]
        var err error
        for i := 0; i < pcp.HealthCheckRetries; i++ {
            if _, err := net.DialTimeout("tcp", checkedIp+":"+strconv.Itoa(pcp.Port), time.Duration(pcp.HealthCheckoutTimeOut)*time.Second); err == nil {
                break
            }
        }
        if err == nil {
            klog.V(4).Infof("Edge ping health check plugin %s for ip %s succeed", pcp.Name(), checkedIp)
            checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMax)
        } else {
            klog.Warning("Edge ping health check plugin %s for ip %s failed, possible reason %s", pcp.Name(), checkedIp, err.Error())
            checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMin)
        }
    })
}
// CheckPluginScoreInfo relevant functions
func (cm *CheckMetadata) SetByPluginScore(checkedIp, pluginName string, weight float64, score int) {
    cm.Lock()
    defer cm.Unlock()
    if _, existed := cm.CheckPluginScoreInfo[checkedIp]; !existed {
        cm.CheckPluginScoreInfo[checkedIp] = make(map[string]float64)
    }
    cm.CheckPluginScoreInfo[checkedIp][pluginName] = float64(score) * weight
}

CheckExecute 會對同區域每一個節點執行 ping 探測(net.DialTimeout),若是失敗,則給該節點打 CheckScoreMin 分(0);不然,打 CheckScoreMax 分(100)

每種檢查插件會有一個 Weight 參數,表示了該檢查插件分數的權重值,全部權重參數之和應該爲1,對應基準分數線 HealthCheckScoreLine 範圍0-100。所以這裏在設置分數時,會乘以權重

回到 ExecuteCheck 函數,在調用各插件執行健康檢查得出權重分數(CheckPluginScoreInfo)後,還須要將該分數與基準線 HealthCheckScoreLine 對比:若是高於(>=)分數線,則認爲該節點本次檢查正常;不然異常

func (ehd *EdgeHealthDaemon) ExecuteCheck() {
    util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {
        ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)
    })
    klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)
    for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {
        totalScore := 0.0
        for _, score := range pluginScores {
            totalScore += score
        }
        if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {
            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})
        } else {
            ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})
        }
    }
    klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}

三、Commun

在對同區域各邊緣節點執行健康檢查後,須要將檢查的結果傳遞給其它各節點,這也就是 commun 模塊負責的事情:

func (ehd *EdgeHealthDaemon) Run(stopCh <-chan struct{}) {
    // Execute edge health prepare and check
    ehd.PrepareAndCheck(stopCh)
    // Execute vote
    vote := vote.NewVoteEdge(&ehd.cfg.Vote)
    go vote.Vote(ehd.metadata, ehd.cfg.Kubeclient, ehd.cfg.Node.LocalIp, stopCh)
    // Execute communication
    communEdge := commun.NewCommunEdge(&ehd.cfg.Commun)
    communEdge.Commun(ehd.metadata.CheckMetadata, ehd.cmLister, ehd.cfg.Node.LocalIp, stopCh)
    <-stopCh
}

既然是互相傳遞結果給其它節點,則必然會有接受和發送模塊:

func (c *CommunEdge) Commun(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string, stopCh <-chan struct{}) {
    go c.communReceive(checkMetadata, cmLister, stopCh)
    wait.Until(func() {
        c.communSend(checkMetadata, cmLister, localIp)
    }, time.Duration(c.CommunPeriod)*time.Second, stopCh)
}

其中 communSend 負責向其它節點發送本節點對它們的檢查結果;而 communReceive 負責接受其它邊緣節點的檢查結果。下面依次分析:

func (c *CommunEdge) communSend(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string) {
    copyLocalCheckDetail := checkMetadata.CopyLocal(localIp)
    var checkedIps []string
    for checkedIp := range copyLocalCheckDetail {
        checkedIps = append(checkedIps, checkedIp)
    }
    util.ParallelizeUntil(context.TODO(), 16, len(checkedIps), func(index int) {
        // Only send commun information to other edge nodes(excluding itself)
        dstIp := checkedIps[index]
        if dstIp == localIp {
            return
        }
        // Send commun information
        communInfo := metadata.CommunInfo{SourceIP: localIp, CheckDetail: copyLocalCheckDetail}
        if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {
            log.Errorf("communSend: generateHmac err %+v", err)
            return
        } else {
            communInfo.Hmac = hmac
        }
        commonInfoBytes, err := json.Marshal(communInfo)
        if err != nil {
            log.Errorf("communSend: json.Marshal commun info err %+v", err)
            return
        }
        commonInfoReader := bytes.NewReader(commonInfoBytes)
        for i := 0; i < c.CommunRetries; i++ {
            req, err := http.NewRequest("PUT", "http://"+dstIp+":"+strconv.Itoa(c.CommunServerPort)+"/result", commonInfoReader)
            if err != nil {
                log.Errorf("communSend: NewRequest for remote edge node %s err %+v", dstIp, err)
                continue
            }
            if err = util.DoRequestAndDiscard(c.client, req); err != nil {
                log.Errorf("communSend: DoRequestAndDiscard for remote edge node %s err %+v", dstIp, err)
            } else {
                log.V(4).Infof("communSend: put commun info %+v to remote edge node %s successfully", communInfo, dstIp)
                break
            }
        }
    })
}

發送邏輯以下:

  • 構建 CommunInfo 結構體,包括:
    • SourceIP:表示執行檢查的ip
    • CheckDetail:爲 Checked ip:Check detail 組織形式,包含被檢查的ip以及檢查結果
  • 調用 GenerateHmac 構建 Hmac:其實是以 kube-system 下的 hmac-config configmap hmackey 字段爲 key,對 SourceIP 以及 CheckDetail進行 hmac 獲得,用於判斷傳輸數據的有效性(是否被篡改)
func GenerateHmac(communInfo metadata.CommunInfo, cmLister corelisters.ConfigMapLister) (string, error) {
    addrBytes, err := json.Marshal(communInfo.SourceIP)
    if err != nil {
        return "", err
    }
    detailBytes, _ := json.Marshal(communInfo.CheckDetail)
    if err != nil {
        return "", err
    }
    hmacBefore := string(addrBytes) + string(detailBytes)
    if hmacConf, err := cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.HmacConfig); err != nil {
        return "", err
    } else {
        return GetHmacCode(hmacBefore, hmacConf.Data[common.HmacKey])
    }
}
func GetHmacCode(s, key string) (string, error) {
    h := hmac.New(sha256.New, []byte(key))
    if _, err := io.WriteString(h, s); err != nil {
        return "", err
    }
    return fmt.Sprintf("%x", h.Sum(nil)), nil
}
  • 發送上述構建的 CommunInfo 給其它邊緣節點(DoRequestAndDiscard)

communReceive邏輯也很清晰:

// TODO: support changeable server listen port
func (c *CommunEdge) communReceive(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, stopCh <-chan struct{}) {
    svr := &http.Server{Addr: ":" + strconv.Itoa(c.CommunServerPort)}
    svr.ReadTimeout = time.Duration(c.CommunTimeout) * time.Second
    svr.WriteTimeout = time.Duration(c.CommunTimeout) * time.Second
    http.HandleFunc("/debug/flags/v", pkgutil.UpdateLogLevel)
    http.HandleFunc("/result", func(w http.ResponseWriter, r *http.Request) {
        var communInfo metadata.CommunInfo
        if r.Body == nil {
            http.Error(w, "Invalid commun information", http.StatusBadRequest)
            return
        }
        err := json.NewDecoder(r.Body).Decode(&communInfo)
        if err != nil {
            http.Error(w, fmt.Sprintf("Invalid commun information %+v", err), http.StatusBadRequest)
            return
        }
        log.V(4).Infof("Received common information from %s : %+v", communInfo.SourceIP, communInfo.CheckDetail)
        if _, err := io.WriteString(w, "Received!\n"); err != nil {
            log.Errorf("communReceive: send response err %+v", err)
            http.Error(w, fmt.Sprintf("Send response err %+v", err), http.StatusInternalServerError)
            return
        }
        if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {
            log.Errorf("communReceive: server GenerateHmac err %+v", err)
            http.Error(w, fmt.Sprintf("GenerateHmac err %+v", err), http.StatusInternalServerError)
            return
        } else {
            if hmac != communInfo.Hmac {
                log.Errorf("communReceive: Hmac not equal, hmac is %s but received commun info hmac is %s", hmac, communInfo.Hmac)
                http.Error(w, "Hmac not match", http.StatusForbidden)
                return
            }
        }
        log.V(4).Infof("communReceive: Hmac match")
        checkMetadata.SetByCommunInfo(communInfo)
        log.V(4).Infof("After communicate, check info is %+v", checkMetadata.CheckInfo)
    })
    go func() {
        if err := svr.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatalf("Server: exit with error %+v", err)
        }
    }()
    for {
        select {
        case <-stopCh:
            ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
            defer cancel()
            if err := svr.Shutdown(ctx); err != nil {
                log.Errorf("Server: program exit, server exit error %+v", err)
            }
            return
        default:
        }
    }
}

負責接受其它邊緣節點的檢查結果,並寫入自身檢查結果 CheckInfo,流程以下:

  • 經過/result路由接受請求,並將請求內容解析成 CommunInfo

  • 對 CommunInfo 執行 GenerateHmac 獲取hmac值,並與 CommunInfo.Hmac 字段進行對比,檢查接受數據的有效性

  • 最後將 CommunInfo 檢查結果寫入 CheckInfo,注意:CheckDetail.Time 設置爲寫入時的時間

    // CheckInfo relevant functions
    func (cm *CheckMetadata) SetByCommunInfo(c CommunInfo) {
      cm.Lock()
      defer cm.Unlock()
      if _, existed := cm.CheckInfo[c.SourceIP]; !existed {
          cm.CheckInfo[c.SourceIP] = make(map[string]CheckDetail)
      }
      for k, detail := range c.CheckDetail {
          // Update time to local timestamp since different machines have different ones
          detail.Time = time.Now()
          c.CheckDetail[k] = detail
      }
      cm.CheckInfo[c.SourceIP] = c.CheckDetail
    }
  • 最後在接受到 stopCh 信號時,經過 svr.Shutdown 平滑關閉服務

四、Vote

在接受到其它節點的健康檢查結果後,vote 模塊會對結果進行統計得出最終判決,並向 apiserver 報告:

func (v *VoteEdge) Vote(edgeHealthMetadata *metadata.EdgeHealthMetadata, kubeclient clientset.Interface,
    localIp string, stopCh <-chan struct{}) {
    go wait.Until(func() {
        v.vote(edgeHealthMetadata, kubeclient, localIp, stopCh)
    }, time.Duration(v.VotePeriod)*time.Second, stopCh)
}

首先根據檢查結果統計出狀態正常以及異常的節點列表:

type votePair struct {
    pros int
    cons int
}
...
var (
    prosVoteIpList, consVoteIpList []string
    // Init votePair since cannot assign to struct field voteCountMap[checkedIp].pros in map
    vp votePair
)
voteCountMap := make(map[string]votePair) // {"127.0.0.1":{"pros":1,"cons":2}}
copyCheckInfo := edgeHealthMetadata.CopyAll()
// Note that voteThreshold should be calculated by checked instead of checker
// since checked represents the total valid edge health nodes while checker may contain partly ones.
voteThreshold := (edgeHealthMetadata.GetCheckedIpLen() + 1) / 2
for _, checkedDetails := range copyCheckInfo {
    for checkedIp, checkedDetail := range checkedDetails {
        if !time.Now().After(checkedDetail.Time.Add(time.Duration(v.VoteTimeout) * time.Second)) {
            if _, existed := voteCountMap[checkedIp]; !existed {
                voteCountMap[checkedIp] = votePair{0, 0}
            }
            vp = voteCountMap[checkedIp]
            if checkedDetail.Normal {
                vp.pros++
                if vp.pros >= voteThreshold {
                    prosVoteIpList = append(prosVoteIpList, checkedIp)
                }
            } else {
                vp.cons++
                if vp.cons >= voteThreshold {
                    consVoteIpList = append(consVoteIpList, checkedIp)
                }
            }
            voteCountMap[checkedIp] = vp
        }
    }
}
log.V(4).Infof("Vote: voteCountMap is %+v", voteCountMap)
...

其中狀態判斷的邏輯以下:

  • 若是超過一半(>)的節點對該節點的檢查結果爲正常,則認爲該節點狀態正常(注意時間差在 VoteTimeout 內)
  • 若是超過一半(>)的節點對該節點的檢查結果爲異常,則認爲該節點狀態異常(注意時間差在 VoteTimeout 內)
  • 除開上述狀況,認爲節點狀態判斷無效,對這些節點不作任何處理(可能存在腦裂的狀況)

對狀態正常的節點作以下處理:

...
// Handle prosVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(prosVoteIpList), func(index int) {
    if node := edgeHealthMetadata.GetNodeByAddr(prosVoteIpList[index]); node != nil {
        log.V(4).Infof("Vote: vote pros to edge node %s begin ...", node.Name)
        nodeCopy := node.DeepCopy()
        needUpdated := false
        if nodeCopy.Annotations == nil {
            nodeCopy.Annotations = map[string]string{
                common.NodeHealthAnnotation: common.NodeHealthAnnotationPros,
            }
            needUpdated = true
        } else {
            if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {
                if healthy != common.NodeHealthAnnotationPros {
                    nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros
                    needUpdated = true
                }
            } else {
                nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationPros
                needUpdated = true
            }
        }
        if index, existed := admissionutil.TaintExistsPosition(nodeCopy.Spec.Taints, common.UnreachableNoExecuteTaint); existed {
            nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints[:index], nodeCopy.Spec.Taints[index+1:]...)
            needUpdated = true
        }
        if needUpdated {
            if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {
                log.Errorf("Vote: update pros vote to edge node %s error %+v ", nodeCopy.Name, err)
            } else {
                log.V(2).Infof("Vote: update pros vote to edge node %s successfully", nodeCopy.Name)
            }
        }
    } else {
        log.Warningf("Vote: edge node addr %s not found", prosVoteIpList[index])
    }
})
...
  • 添加或者更新"superedgehealth/node-health" annotation 值爲"true",代表分佈式健康檢查判斷該節點狀態正常。
  • 若是node存在 NoExecute(node.kubernetes.io/unreachable) taint,則將其去掉,並更新 node.

而對狀態異常的節點會添加或者更新"superedgehealth/node-health" annotation值爲"false",代表分佈式健康檢查判斷該節點狀態異常:

// Handle consVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(consVoteIpList), func(index int) {
    if node := edgeHealthMetadata.GetNodeByAddr(consVoteIpList[index]); node != nil {
        log.V(4).Infof("Vote: vote cons to edge node %s begin ...", node.Name)
        nodeCopy := node.DeepCopy()
        needUpdated := false
        if nodeCopy.Annotations == nil {
            nodeCopy.Annotations = map[string]string{
                common.NodeHealthAnnotation: common.NodeHealthAnnotationCons,
            }
            needUpdated = true
        } else {
            if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {
                if healthy != common.NodeHealthAnnotationCons {
                    nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons
                    needUpdated = true
                }
            } else {
                nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationCons
                needUpdated = true
            }
        }
        if needUpdated {
            if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {
                log.Errorf("Vote: update cons vote to edge node %s error %+v ", nodeCopy.Name, err)
            } else {
                log.V(2).Infof("Vote: update cons vote to edge node %s successfully", nodeCopy.Name)
            }
        }
    } else {
        log.Warningf("Vote: edge node addr %s not found", consVoteIpList[index])
    }
})

在邊端 edge-health-daemon 向 apiserver 發送節點健康結果後,雲端運行 edge-health-admission(Kubernetes mutating admission webhook),會不斷根據 node edge-health annotation 調整 kube-controller-manager 設置的 node taint(去掉NoExecute taint) 以及 endpoints(將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現即使雲邊斷連,可是分佈式健康檢查狀態正常的狀況下:

  • 失聯的節點上的 pod 不會從 Service 的 Endpoint 列表中移除
  • 失聯的節點上的 pod 不會被驅逐

總結

  • 分佈式健康檢查對於雲邊斷連狀況的處理區別原生 Kubernetes 以下:
    • 原生 Kubernetes:
      • 失聯的節點被置爲 ConditionUnknown 狀態,並被添加 NoSchedule 和 NoExecute 的 taints
      • 失聯的節點上的pod被驅逐,並在其餘節點上進行重建
      • 失聯的節點上的pod從 Service 的 Endpoint 列表中移除
    • 分佈式健康檢查:
  • 分佈式健康檢查主要經過以下三個層面加強節點狀態判斷的準確性:
    • 每一個節點按期探測其餘節點健康狀態
    • 集羣內全部節點按期投票決定各節點的狀態
    • 雲端和邊端節點共同決定節點狀態
  • 分佈式健康檢查功能由邊端的 edge-health-daemon 以及雲端的 edge-health-admission 組成,功能分別以下:
    • edge-health-daemon:對同區域邊緣節點執行分佈式健康檢查,並向 apiserver 發送健康狀態投票結果(給 node 打 annotation),主體邏輯包括四部分功能:
      • SyncNodeList:根據邊緣節點所在的 zone 刷新 node cache,同時更新 CheckMetadata 相關數據
      • ExecuteCheck:對每一個邊緣節點執行若干種類的健康檢查插件(ping,kubelet等),並將各插件檢查分數彙總,根據用戶設置的基準線得出節點是否健康的結果
      • Commun:將本節點對其它各節點健康檢查的結果發送給其它節點
      • Vote:對全部節點健康檢查的結果分類,若是某個節點被大多數(>1/2)節點斷定爲正常,則對該節點添加 superedgehealth/node-health:true annotation,代表該節點分佈式健康檢查結果爲正常;不然,對該節點添加 superedgehealth/node-health:false annotation,代表該節點分佈式健康檢查結果爲異常
    • edge-health-admission(Kubernetes mutating admission webhook):不斷根據 node edge-health annotation 調整 kube-controller-manager 設置的 node taint(去掉 NoExecute taint)以及endpoints(將失聯節點上的 pods 從 endpoint subsets notReadyAddresses 移到 addresses中),從而實現雲端和邊端共同決定節點狀態

duyanghao kubernetes-reading-notes

【騰訊雲原生】雲說新品、雲研新術、雲遊新活、雲賞資訊,掃碼關注同名公衆號,及時獲取更多幹貨!!

相關文章
相關標籤/搜索