本篇文章咱們從ServiceConfig的建立和運行開始。node
1、ServiceConfig的建立網絡
ServiceConfig是kube-proxy中用於監聽service變化的組件,其本質就是informer,進入NewServiceConfig方法可知。app
pkg/proxy/config/config.go // NewServiceConfig creates a new ServiceConfig. func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig { result := &ServiceConfig{ lister: serviceInformer.Lister(), listerSynced: serviceInformer.Informer().HasSynced, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, ) return result }
方法爲serviceInformer添加了3個回調函數,當service發生變化時會調用相應的函數。less
2、回調函數async
add和delete的回調函數本質上還是執行了update的回調函數,特殊點在於前者爲nil更新爲目標service,後者爲目標service更新爲nil。所以咱們以update爲例。函數
其回調函數handleUpdateService本質上是調用了OnServiceUpdate方法:oop
pkg/proxy/iptables/proxier.go
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } }
方法很短,首先調用了Update方法,成功後調用Run方法。ui
看一下Update方法:this
pkg/proxy/service.go // Update updates given service's change map based on the <previous, current> service pair. It returns true if items changed, // otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example, // Add item // - pass <nil, service> as the <previous, current> pair. // Update item // - pass <oldService, service> as the <previous, current> pair. // Delete item // - pass <service, nil> as the <previous, current> pair. func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { svc := current if svc == nil { svc = previous } // previous == nil && current == nil is unexpected, we should return false directly. if svc == nil { return false } namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} sct.lock.Lock() defer sct.lock.Unlock() change, exists := sct.items[namespacedName] if !exists { change = &serviceChange{} change.previous = sct.serviceToServiceMap(previous) sct.items[namespacedName] = change } change.current = sct.serviceToServiceMap(current) // if change.previous equal to change.current, it means no change if reflect.DeepEqual(change.previous, change.current) { delete(sct.items, namespacedName) } return len(sct.items) > 0 }
能夠看到,這一方法足以處理add、update、delete三種狀況。這裏的update本質上是調用serviceToServiceMap方法,將service的改變前和改變後的狀態存儲在ServiceChangeTracker結構體的items map中,其鍵和值分別爲service的NamespacedName和serviceChange兩個結構體。serviceChange結構體只有兩個字段:spa
pkg/proxy/service.go // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying all of the changes. type serviceChange struct { previous ServiceMap current ServiceMap }
分別用來存放狀態改變先後的ServiceMap,而ServiceMap則是一個以ServicePortName和ServicePort爲鍵值對的map:
pkg/proxy/service.go
type ServiceMap map[ServicePortName]ServicePort
綜上,Update方法作的就是將service的改變先後的數據存入map中。
Update執行成功且proxier初始化後,會調用syncRunner.Run方法。進入看看:
pkg/util/async/bounded_frequency_runner.go func (bfr *BoundedFrequencyRunner) Run() { // If it takes a lot of time to run the underlying function, noone is really // processing elements from <run> channel. So to avoid blocking here on the // putting element to it, we simply skip it if there is already an element // in it. select { case bfr.run <- struct{}{}: default: } }
當bfr的run字段爲空時,將一個空結構體寫入這個channel,不然直接結束。而這個結構體一旦傳入channel,會觸發bfr調用相應的方法。
3、ServiceConfig的運行
回想上一篇文章中的server.go中的Run方法:
cmd/kube-proxy/app/server.go // Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set). func (s *ProxyServer) Run() error { ... serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) serviceConfig.RegisterEventHandler(s.ServiceEventHandler) go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) ...// Just loop forever for now... s.Proxier.SyncLoop() return nil }
前面咱們重點看了NewServiceConfig方法,下面咱們來看後面的Run和SyncLoop。
進入Run方法:
pkg/proxy/config/config.go
// Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() klog.Info("Starting service config controller") defer klog.Info("Shutting down service config controller") if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) { return } for i := range c.eventHandlers { klog.V(3).Info("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() } <-stopCh }
此方法的核心就是調用handler的OnServiceSynced方法。進入OnServiceSynced:
pkg/proxy/iptables/proxier.go func (proxier *Proxier) OnServiceSynced() { proxier.mu.Lock() proxier.servicesSynced = true proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced) proxier.mu.Unlock() // Sync unconditionally - this is called once per lifetime. proxier.syncProxyRules() }
首先調用setInitialized方法,將proxier初始化。只有通過初始化後,proxier纔會開始調用回調函數。最後,就是執行一次syncProxyRules方法。
總而言之,Run方法是將剛建立好的ServiceConfig初始化,並在初始化後先調用一次syncProxyRules方法。而這個方法,就是kube-proxy維護iptables的具體操做,咱們後面再詳細分析。
4、SyncLoop
下面咱們再來看SyncLoop。SyncLoop本質上調用了bounded_frequency_runner.go中的Loop方法:
pkg/util/async/bounded_frequency_runner.go
// Loop handles the periodic timer and run requests. This is expected to be // called as a goroutine. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { klog.V(3).Infof("%s Loop running", bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() klog.V(3).Infof("%s Loop stopping", bfr.name) return case <-bfr.timer.C(): bfr.tryRun() case <-bfr.run: bfr.tryRun() } } }
能夠看到,此方法運行一個無限循環,並定時運行tryRun方法。此外,當bfr的run字段有消息傳入時,也會執行一次tryRun。那麼這個channel何時傳入消息呢?答案就是上一篇文章中提到的,ServiceConfig的回調函數被調用的時候。因此說,每當service發生變化,回調函數被調用時,最終都會執行一次tryRun方法。
進入tryRun方法:
pkg/util/async/bounded_frequency_runner.go
// assumes the lock is not held func (bfr *BoundedFrequencyRunner) tryRun() { bfr.mu.Lock() defer bfr.mu.Unlock() if bfr.limiter.TryAccept() { // We're allowed to run the function right now. bfr.fn() bfr.lastRun = bfr.timer.Now() bfr.timer.Stop() bfr.timer.Reset(bfr.maxInterval) klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) return } // It can't run right now, figure out when it can run next. elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run nextPossible := bfr.minInterval - elapsed // time to next possible run nextScheduled := bfr.maxInterval - elapsed // time to next periodic run klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) if nextPossible < nextScheduled { // Set the timer for ASAP, but don't drain here. Assuming Loop is running, // it might get a delivery in the mean time, but that is OK. bfr.timer.Stop() bfr.timer.Reset(nextPossible) klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) } }
全部其餘的代碼,都在爲bfr.fn服務,此方法的核心,就是在合適的時機運行bfr.fn方法。而這一方法,是在建立proxier的時候註冊進去的。回憶一下上一篇的內容,在NewProxier方法中有一行:
pkg/proxy/iptables/proxier.go func NewProxier(...) (*Proxier, error) { ...... proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) return proxier, nil }
就是在這裏進行的註冊。能夠看到,這裏將前面提到的proxier.syncProxyRules方法註冊爲了bfr.fn。
因此綜上可知,syncProxyRules方法有三種被執行的途徑,即:
(1)在service和endpoint的Config剛建立並初始化的時候;
(2)在service和endpoint發生變化的時候;
(3)每隔一段時間會自動執行。
5、syncProxyRules
syncProxyRules方法是kube-proxy組件最爲核心的方法。方法長達700多行,涉及到大量的網絡相關知識:
pkg/proxy/iptables/proxier.go // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // This assumes proxier.mu is NOT held func (proxier *Proxier) syncProxyRules() { proxier.mu.Lock() defer proxier.mu.Unlock() ...// We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges) ... klog.V(3).Info("Syncing iptables rules") // Create and link the kube chains. for _, chain := range iptablesJumpChains { if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil { klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err) return } args := append(chain.extraArgs, "-m", "comment", "--comment", chain.comment, "-j", string(chain.chain), ) if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil { klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err) return } } ...
// Build rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) if !ok { klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) protocol := strings.ToLower(string(svcInfo.Protocol)) svcNameString := svcInfo.serviceNameString hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 svcChain := svcInfo.servicePortChainName if hasEndpoints { // Create the per-service chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true } ...// Capture the clusterIP. if hasEndpoints { args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "--dport", strconv.Itoa(svcInfo.Port), ) ... } else { writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "--dport", strconv.Itoa(svcInfo.Port), "-j", "REJECT", ) } // Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPs {
...
} // Capture load-balancer ingress. if hasEndpoints { ... }// Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.NodePort != 0 { ... } if !hasEndpoints { continue } ...// Now write loadbalancing & DNAT rules.
...
// Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
... } // Delete chains no longer in use.
...
// Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules.
...
// Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes()) proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { klog.Errorf("Failed to execute iptables-restore: %v", err) // Revert new local ports. klog.V(2).Infof("Closing local ports after iptables-restore failure") utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } ... }
這裏只貼出一小部分。總的來講,方法作了如下幾件事:
(1)調用UpdateServiceMap和UpdateEndpointMap方法,執行Service和Endpoint的更新。
(2)添加數據鏈表。
(3)遍歷全部的service,判斷每一個service的類型,並添加相應的規則。
(4)刪除多餘的鏈表,並對iptables進行重構。
具體細節可參考https://blog.csdn.net/zhangxiangui40542/article/details/79486995、https://www.jianshu.com/p/a978af8e5dd8、https://blog.csdn.net/zhonglinzhang/article/details/80185053。
6、總結
kube-proxy組件(iptables模式下)的邏輯相對簡單,就是經過informer去實時監聽service和endpoint資源的變化,並及時更新iptables。