kubernetes源碼閱讀筆記——kube-proxy(之二)

本篇文章咱們從ServiceConfig的建立和運行開始。node

1、ServiceConfig的建立網絡

ServiceConfig是kube-proxy中用於監聽service變化的組件,其本質就是informer,進入NewServiceConfig方法可知。app

pkg/proxy/config/config.go // NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
    result := &ServiceConfig{
        lister:       serviceInformer.Lister(),
        listerSynced: serviceInformer.Informer().HasSynced,
    }

    serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService,
        },
        resyncPeriod,
    )

    return result
}

方法爲serviceInformer添加了3個回調函數,當service發生變化時會調用相應的函數。less

2、回調函數async

add和delete的回調函數本質上還是執行了update的回調函數,特殊點在於前者爲nil更新爲目標service,後者爲目標service更新爲nil。所以咱們以update爲例。函數

其回調函數handleUpdateService本質上是調用了OnServiceUpdate方法:oop

pkg/proxy/iptables/proxier.go

func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } }

方法很短,首先調用了Update方法,成功後調用Run方法。ui

看一下Update方法:this

pkg/proxy/service.go // Update updates given service's change map based on the <previous, current> service pair.  It returns true if items changed,
// otherwise return false.  Update can be used to add/update/delete items of ServiceChangeMap.  For example,
// Add item
//   - pass <nil, service> as the <previous, current> pair.
// Update item
//   - pass <oldService, service> as the <previous, current> pair.
// Delete item
//   - pass <service, nil> as the <previous, current> pair.
func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
    svc := current
    if svc == nil {
        svc = previous
    }
    // previous == nil && current == nil is unexpected, we should return false directly.
    if svc == nil {
        return false
    }
    namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}

    sct.lock.Lock()
    defer sct.lock.Unlock()

    change, exists := sct.items[namespacedName]
    if !exists {
        change = &serviceChange{}
        change.previous = sct.serviceToServiceMap(previous)
        sct.items[namespacedName] = change
    }
    change.current = sct.serviceToServiceMap(current)
    // if change.previous equal to change.current, it means no change
    if reflect.DeepEqual(change.previous, change.current) {
        delete(sct.items, namespacedName)
    }
    return len(sct.items) > 0
}

能夠看到,這一方法足以處理add、update、delete三種狀況。這裏的update本質上是調用serviceToServiceMap方法,將service的改變前和改變後的狀態存儲在ServiceChangeTracker結構體的items map中,其鍵和值分別爲service的NamespacedName和serviceChange兩個結構體。serviceChange結構體只有兩個字段:spa

pkg/proxy/service.go // serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
    previous ServiceMap
    current  ServiceMap
}

分別用來存放狀態改變先後的ServiceMap,而ServiceMap則是一個以ServicePortName和ServicePort爲鍵值對的map:

pkg/proxy/service.go

type ServiceMap map[ServicePortName]ServicePort

綜上,Update方法作的就是將service的改變先後的數據存入map中。

Update執行成功且proxier初始化後,會調用syncRunner.Run方法。進入看看:

pkg/util/async/bounded_frequency_runner.go 
func (bfr *BoundedFrequencyRunner) Run() {
    // If it takes a lot of time to run the underlying function, noone is really
    // processing elements from <run> channel. So to avoid blocking here on the
    // putting element to it, we simply skip it if there is already an element
    // in it.
    select {
    case bfr.run <- struct{}{}:
    default:
    }
}

當bfr的run字段爲空時,將一個空結構體寫入這個channel,不然直接結束。而這個結構體一旦傳入channel,會觸發bfr調用相應的方法。

3、ServiceConfig的運行

回想上一篇文章中的server.go中的Run方法:

cmd/kube-proxy/app/server.go // Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
func (s *ProxyServer) Run() error {
    ...
    serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
    serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
    go serviceConfig.Run(wait.NeverStop)

    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
    go endpointsConfig.Run(wait.NeverStop)

    ...// Just loop forever for now...
 s.Proxier.SyncLoop()
    return nil
}

前面咱們重點看了NewServiceConfig方法,下面咱們來看後面的Run和SyncLoop。

進入Run方法:

pkg/proxy/config/config.go

//
Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() klog.Info("Starting service config controller") defer klog.Info("Shutting down service config controller") if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) { return } for i := range c.eventHandlers { klog.V(3).Info("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() } <-stopCh }

此方法的核心就是調用handler的OnServiceSynced方法。進入OnServiceSynced:

pkg/proxy/iptables/proxier.go

func (proxier *Proxier) OnServiceSynced() {
    proxier.mu.Lock()
    proxier.servicesSynced = true proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
    proxier.mu.Unlock()

    // Sync unconditionally - this is called once per lifetime.
 proxier.syncProxyRules()
}

首先調用setInitialized方法,將proxier初始化。只有通過初始化後,proxier纔會開始調用回調函數。最後,就是執行一次syncProxyRules方法。

總而言之,Run方法是將剛建立好的ServiceConfig初始化,並在初始化後先調用一次syncProxyRules方法。而這個方法,就是kube-proxy維護iptables的具體操做,咱們後面再詳細分析。

4、SyncLoop

下面咱們再來看SyncLoop。SyncLoop本質上調用了bounded_frequency_runner.go中的Loop方法:

pkg/util/async/bounded_frequency_runner.go

//
Loop handles the periodic timer and run requests. This is expected to be // called as a goroutine. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { klog.V(3).Infof("%s Loop running", bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() klog.V(3).Infof("%s Loop stopping", bfr.name) return case <-bfr.timer.C(): bfr.tryRun() case <-bfr.run: bfr.tryRun() } } }

能夠看到,此方法運行一個無限循環,並定時運行tryRun方法。此外,當bfr的run字段有消息傳入時,也會執行一次tryRun。那麼這個channel何時傳入消息呢?答案就是上一篇文章中提到的,ServiceConfig的回調函數被調用的時候。因此說,每當service發生變化,回調函數被調用時,最終都會執行一次tryRun方法。

進入tryRun方法:

pkg/util/async/bounded_frequency_runner.go
// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
    bfr.mu.Lock()
    defer bfr.mu.Unlock()

    if bfr.limiter.TryAccept() {
        // We're allowed to run the function right now.
 bfr.fn()
        bfr.lastRun = bfr.timer.Now()
        bfr.timer.Stop()
        bfr.timer.Reset(bfr.maxInterval)
        klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
        return
    }

    // It can't run right now, figure out when it can run next.

    elapsed := bfr.timer.Since(bfr.lastRun)    // how long since last run
    nextPossible := bfr.minInterval - elapsed  // time to next possible run
    nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
    klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

    if nextPossible < nextScheduled {
        // Set the timer for ASAP, but don't drain here.  Assuming Loop is running,
        // it might get a delivery in the mean time, but that is OK.
        bfr.timer.Stop()
        bfr.timer.Reset(nextPossible)
        klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
    }
}

全部其餘的代碼,都在爲bfr.fn服務,此方法的核心,就是在合適的時機運行bfr.fn方法。而這一方法,是在建立proxier的時候註冊進去的。回憶一下上一篇的內容,在NewProxier方法中有一行:

pkg/proxy/iptables/proxier.go 
func NewProxier(...) (*Proxier, error) {
    
    ......

    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
    return proxier, nil
}

就是在這裏進行的註冊。能夠看到,這裏將前面提到的proxier.syncProxyRules方法註冊爲了bfr.fn。

因此綜上可知,syncProxyRules方法有三種被執行的途徑,即:

(1)在service和endpoint的Config剛建立並初始化的時候;

(2)在service和endpoint發生變化的時候;

(3)每隔一段時間會自動執行。

5、syncProxyRules

syncProxyRules方法是kube-proxy組件最爲核心的方法。方法長達700多行,涉及到大量的網絡相關知識:

pkg/proxy/iptables/proxier.go

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
    proxier.mu.Lock()
    defer proxier.mu.Unlock()

    ...// We assume that if this was called, we really want to sync them,
    // even if nothing changed in the meantime. In other words, callers are
    // responsible for detecting no-op changes and not calling this function.
    serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
    endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)

    ...

    klog.V(3).Info("Syncing iptables rules")

    // Create and link the kube chains.
    for _, chain := range iptablesJumpChains {
        if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
            return
        }
        args := append(chain.extraArgs,
            "-m", "comment", "--comment", chain.comment,
            "-j", string(chain.chain),
        )
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
            return
        }
    }

    ...

// Build rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) if !ok { klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) protocol := strings.ToLower(string(svcInfo.Protocol)) svcNameString := svcInfo.serviceNameString hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 svcChain := svcInfo.servicePortChainName if hasEndpoints { // Create the per-service chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true } ...// Capture the clusterIP. if hasEndpoints { args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "--dport", strconv.Itoa(svcInfo.Port), ) ... } else { writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "--dport", strconv.Itoa(svcInfo.Port), "-j", "REJECT", ) } // Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPs {
...

} // Capture load-balancer ingress. if hasEndpoints { ... }// Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.NodePort != 0 { ... } if !hasEndpoints { continue } ...// Now write loadbalancing & DNAT rules.
...

// Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.

     ... }
// Delete chains no longer in use.
...

// Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules.

...

// Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes()) proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { klog.Errorf("Failed to execute iptables-restore: %v", err) // Revert new local ports. klog.V(2).Infof("Closing local ports after iptables-restore failure") utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } ... }

這裏只貼出一小部分。總的來講,方法作了如下幾件事:

(1)調用UpdateServiceMap和UpdateEndpointMap方法,執行Service和Endpoint的更新。

(2)添加數據鏈表。

(3)遍歷全部的service,判斷每一個service的類型,並添加相應的規則。

(4)刪除多餘的鏈表,並對iptables進行重構。

具體細節可參考https://blog.csdn.net/zhangxiangui40542/article/details/79486995https://www.jianshu.com/p/a978af8e5dd8https://blog.csdn.net/zhonglinzhang/article/details/80185053

6、總結

kube-proxy組件(iptables模式下)的邏輯相對簡單,就是經過informer去實時監聽service和endpoint資源的變化,並及時更新iptables。

相關文章
相關標籤/搜索