Kubernetes kube-proxy iptables 模式深刻剖析(二)-Kubernetes商業環境實戰

專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號,或請轉發郵件至1120746959@qq.com。 node

1 kube-proxy技術原理

1.1 Iptables 代理模式

  • kube-proxy 會監視 Kubernetes master 對 Service 對象和 Endpoints 對象的添加和移除。 對每一個 Service,它會配置 iptables 規則,從而捕獲到達該 Service 的 clusterIP(虛擬 IP)和端口的 請求,進而將請求重定向到 Service 的一組 backend 中的某個上面。
  • 默認的策略是,隨機選擇一個 backend。 實現基於客戶端 IP 的會話親和性,能夠將 service.spec.sessionAffinity 的值設置爲 "ClientIP" (默認值爲 "None")。
iptables -t nat -nvL KUBE-SERVICES
iptables –t nat –nvL KUBE-SVC-YREYKMMDZGMSMDZU
iptables –t nat –nvL KUBE-SEP-*
複製代碼

1.2 IPVS 代理模式

  • 與iptables相似,ipvs基於netfilter 的 hook 功能,但使用哈希表做爲底層數據結構並在內核空間中工做。這意味着ipvs能夠更快地重定向流量,而且在同步代理規則時具備更好的性能。
  • ipvs爲負載均衡算法提供了更多選項,例如: rr:輪詢調度 lc:最小鏈接數 dh:目標哈希 sh:源哈希 sed:最短時間望延遲 nq: 不排隊調度

2 kube-proxy源碼分析

2.1 主要框架

  • kube-proxy是kubernetes中用於實現service與pod之間流量轉發的組件。當咱們向一個service發送數據包時,實際的接收者是service代理的後端pod,這一功能就是由kube-proxy實現的。
  • 與其它組件同樣,kube-proxy的入口函數位於cmd中,具體位置在cmd/kube-proxy/proxy.go,同樣是採用了cobra方法:
func main() {
	command := app.NewProxyCommand()
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}
複製代碼
  • 進入NewProxyCommand方法,其核心仍在於Run方法。進入Run方法,就是生成了一個ProxyServer對象,具體位置在cmd/kube-proxy/sever.go並運行。
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
	defer close(o.errCh)
	if len(o.WriteConfigTo) > 0 {
		return o.writeConfigFile()
	}

	proxyServer, err := NewProxyServer(o)
	if err != nil {
		return err
	}

	if o.CleanupAndExit {
		return proxyServer.CleanupAndExit()
	}

	o.proxyServer = proxyServer
	return o.runLoop()
}
複製代碼
  • NewProxyServer方法調用了私有的newProxyServer方法。方法位於app包中,對於不一樣的操做系統,會執行server_windows.go或者server_others.go中的同名方法。這裏只分析linux下的:
  • 最重要的在於判斷kube-proxy的運行模式,並進行相應處理,生成對應的ProxyServer結構體。目前,kubernetes普遍使用的是iptables模式,所以這裏以iptables爲例,另外兩個模式略過。
  • 能夠看到,在iptables模式下,會調用iptables包中的NewProxier方法,生成適用於iptables的proxier。此外,eventhandler也都配置成這個proxier。最後,將處理後的字段填入ProxyServer結構體中,並返回。
  • newProxier方法相對直觀,就是生成一個proxier並返回。如註釋中所說,proxier會及時維護iptables的狀態,確保iptables數據始終處於最新:
const (
	ProxyModeUserspace   ProxyMode = "userspace"
	ProxyModeIPTables    ProxyMode = "iptables"
	ProxyModeIPVS        ProxyMode = "ipvs"
	ProxyModeKernelspace ProxyMode = "kernelspace"
)

func newProxyServer(
	config *proxyconfigapi.KubeProxyConfiguration,
	cleanupAndExit bool,
	master string) (*ProxyServer, error) {

	if config == nil {
		return nil, errors.New("config is required")
	}

	if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
		c.Set(config)
	} else {
		return nil, fmt.Errorf("unable to register configz: %s", err)
	}

	protocol := utiliptables.ProtocolIpv4
	if net.ParseIP(config.BindAddress).To4() == nil {
		klog.V(0).Infof("IPv6 bind address (%s), assume IPv6 operation", config.BindAddress)
		protocol = utiliptables.ProtocolIpv6
	}

	var iptInterface utiliptables.Interface
	var ipvsInterface utilipvs.Interface
	var kernelHandler ipvs.KernelHandler
	var ipsetInterface utilipset.Interface
	var dbus utildbus.Interface

	// Create a iptables utils.
	execer := exec.New()

	dbus = utildbus.New()
	iptInterface = utiliptables.New(execer, dbus, protocol)
	kernelHandler = ipvs.NewLinuxKernelHandler()
	ipsetInterface = utilipset.New(execer)
	canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
	if canUseIPVS {
		ipvsInterface = utilipvs.New(execer)
	}

	// We omit creation of pretty much everything if we run in cleanup mode
	if cleanupAndExit {
		return &ProxyServer{
			execer:         execer,
			IptInterface:   iptInterface,
			IpvsInterface:  ipvsInterface,
			IpsetInterface: ipsetInterface,
		}, nil
	}

	client, eventClient, err := createClients(config.ClientConnection, master)
	if err != nil {
		return nil, err
	}

	// Create event recorder
	hostname, err := utilnode.GetHostname(config.HostnameOverride)
	if err != nil {
		return nil, err
	}
	eventBroadcaster := record.NewBroadcaster()
	recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})

	nodeRef := &v1.ObjectReference{
		Kind:      "Node",
		Name:      hostname,
		UID:       types.UID(hostname),
		Namespace: "",
	}

	var healthzServer *healthcheck.HealthzServer
	var healthzUpdater healthcheck.HealthzUpdater
	if len(config.HealthzBindAddress) > 0 {
		healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
		healthzUpdater = healthzServer
	}

	var proxier proxy.Provider

	proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
	nodeIP := net.ParseIP(config.BindAddress)
	if nodeIP.IsUnspecified() {
		nodeIP = utilnode.GetNodeIP(client, hostname)
		if nodeIP == nil {
			return nil, fmt.Errorf("unable to get node IP for hostname %s", hostname)
		}
	}
	if proxyMode == proxyModeIPTables {
		klog.V(0).Info("Using iptables Proxier.")
		if config.IPTables.MasqueradeBit == nil {
			// MasqueradeBit must be specified or defaulted.
			return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
		}

		// TODO this has side effects that should only happen when Run() is invoked.
		proxier, err = iptables.NewProxier(
			iptInterface,
			utilsysctl.New(),
			execer,
			config.IPTables.SyncPeriod.Duration,
			config.IPTables.MinSyncPeriod.Duration,
			config.IPTables.MasqueradeAll,
			int(*config.IPTables.MasqueradeBit),
			config.ClusterCIDR,
			hostname,
			nodeIP,
			recorder,
			healthzUpdater,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
		metrics.RegisterMetrics()
	} else if proxyMode == proxyModeIPVS {
		klog.V(0).Info("Using ipvs Proxier.")
		proxier, err = ipvs.NewProxier(
			iptInterface,
			ipvsInterface,
			ipsetInterface,
			utilsysctl.New(),
			execer,
			config.IPVS.SyncPeriod.Duration,
			config.IPVS.MinSyncPeriod.Duration,
			config.IPVS.ExcludeCIDRs,
			config.IPVS.StrictARP,
			config.IPTables.MasqueradeAll,
			int(*config.IPTables.MasqueradeBit),
			config.ClusterCIDR,
			hostname,
			nodeIP,
			recorder,
			healthzServer,
			config.IPVS.Scheduler,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
		metrics.RegisterMetrics()
	} else {
		klog.V(0).Info("Using userspace Proxier.")

		// TODO this has side effects that should only happen when Run() is invoked.
		proxier, err = userspace.NewProxier(
			userspace.NewLoadBalancerRR(),
			net.ParseIP(config.BindAddress),
			iptInterface,
			execer,
			*utilnet.ParsePortRangeOrDie(config.PortRange),
			config.IPTables.SyncPeriod.Duration,
			config.IPTables.MinSyncPeriod.Duration,
			config.UDPIdleTimeout.Duration,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
	}

	iptInterface.AddReloadFunc(proxier.Sync)

	return &ProxyServer{
		Client:                 client,
		EventClient:            eventClient,
		IptInterface:           iptInterface,
		IpvsInterface:          ipvsInterface,
		IpsetInterface:         ipsetInterface,
		execer:                 execer,
		Proxier:                proxier,     //  不一樣模式代理
		Broadcaster:            eventBroadcaster,
		Recorder:               recorder,
		ConntrackConfiguration: config.Conntrack,
		Conntracker:            &realConntracker{},
		ProxyMode:              proxyMode,
		NodeRef:                nodeRef,
		MetricsBindAddress:     config.MetricsBindAddress,
		EnableProfiling:        config.EnableProfiling,
		OOMScoreAdj:            config.OOMScoreAdj,
		ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
		HealthzServer:          healthzServer,
	}, nil
}
複製代碼
  • 定義proxier的syncRunner字段,即proxier的具體運行邏輯
// NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
// Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
// An error will be returned if it fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
// will not terminate if a particular iptables or ipvs call fails.

func NewProxier(ipt utiliptables.Interface,
	ipvs utilipvs.Interface,
	ipset utilipset.Interface,
	sysctl utilsysctl.Interface,
	exec utilexec.Interface,
	syncPeriod time.Duration,
	minSyncPeriod time.Duration,
	excludeCIDRs []string,
	strictARP bool,
	masqueradeAll bool,
	masqueradeBit int,
	clusterCIDR string,
	hostname string,
	nodeIP net.IP,
	recorder record.EventRecorder,
	healthzServer healthcheck.HealthzUpdater,
	scheduler string,
	nodePortAddresses []string,
) (*Proxier, error) {
	// Set the route_localnet sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
		if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
		}
	}

	// Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
	// are connected to a Linux bridge (but not SDN bridges).  Until most
	// plugins handle this, log when config is missing
	if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
		klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
	}

	// Set the conntrack sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlVSConnTrack); val != 1 {
		if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err)
		}
	}

	// Set the connection reuse mode
	if val, _ := sysctl.GetSysctl(sysctlConnReuse); val != 0 {
		if err := sysctl.SetSysctl(sysctlConnReuse, 0); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlConnReuse, err)
		}
	}

	// Set the expire_nodest_conn sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 {
		if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err)
		}
	}

	// Set the expire_quiescent_template sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 {
		if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err)
		}
	}

	// Set the ip_forward sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 {
		if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err)
		}
	}

	if strictARP {
		// Set the arp_ignore sysctl we need for
		if val, _ := sysctl.GetSysctl(sysctlArpIgnore); val != 1 {
			if err := sysctl.SetSysctl(sysctlArpIgnore, 1); err != nil {
				return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpIgnore, err)
			}
		}

		// Set the arp_announce sysctl we need for
		if val, _ := sysctl.GetSysctl(sysctlArpAnnounce); val != 2 {
			if err := sysctl.SetSysctl(sysctlArpAnnounce, 2); err != nil {
				return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpAnnounce, err)
			}
		}
	}

	// Generate the masquerade mark to use for SNAT rules.
	masqueradeValue := 1 << uint(masqueradeBit)
	masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)

	if nodeIP == nil {
		klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
		nodeIP = net.ParseIP("127.0.0.1")
	}

	isIPv6 := utilnet.IsIPv6(nodeIP)

	klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)

	if len(clusterCIDR) == 0 {
		klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
	} else if utilnet.IsIPv6CIDRString(clusterCIDR) != isIPv6 {
		return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6)
	}

	if len(scheduler) == 0 {
		klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
		scheduler = DefaultScheduler
	}

	healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps

	proxier := &Proxier{
		portsMap:              make(map[utilproxy.LocalPort]utilproxy.Closeable),
		serviceMap:            make(proxy.ServiceMap),
		serviceChanges:        proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
		endpointsMap:          make(proxy.EndpointsMap),
		endpointsChanges:      proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
		syncPeriod:            syncPeriod,
		minSyncPeriod:         minSyncPeriod,
		excludeCIDRs:          parseExcludedCIDRs(excludeCIDRs),
		iptables:              ipt,
		masqueradeAll:         masqueradeAll,
		masqueradeMark:        masqueradeMark,
		exec:                  exec,
		clusterCIDR:           clusterCIDR,
		hostname:              hostname,
		nodeIP:                nodeIP,
		portMapper:            &listenPortOpener{},
		recorder:              recorder,
		healthChecker:         healthChecker,
		healthzServer:         healthzServer,
		ipvs:                  ipvs,
		ipvsScheduler:         scheduler,
		ipGetter:              &realIPGetter{nl: NewNetLinkHandle(isIPv6)},
		iptablesData:          bytes.NewBuffer(nil),
		filterChainsData:      bytes.NewBuffer(nil),
		natChains:             bytes.NewBuffer(nil),
		natRules:              bytes.NewBuffer(nil),
		filterChains:          bytes.NewBuffer(nil),
		filterRules:           bytes.NewBuffer(nil),
		netlinkHandle:         NewNetLinkHandle(isIPv6),
		ipset:                 ipset,
		nodePortAddresses:     nodePortAddresses,
		networkInterfacer:     utilproxy.RealNetwork{},
		gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
	}
	// initialize ipsetList with all sets we needed
	proxier.ipsetList = make(map[string]*IPSet)
	for _, is := range ipsetInfo {
		proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
	}
	burstSyncs := 2
	klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
	
	//這裏定義了proxier的syncRunner字段,即proxier的具體運行邏輯
	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
	proxier.gracefuldeleteManager.Run()
	return proxier, nil
}
複製代碼

2.2 ProxyServer運行

2.2.1 主要脈絡

  • 具體位置在cmd/kube-proxy/sever.go
// runLoop will watch on the update change of the proxy server's configuration file. // Return an error when updated func (o *Options) runLoop() error { if o.watcher != nil { o.watcher.Run() } // run the proxy in goroutine go func() { err := o.proxyServer.Run() o.errCh <- err }() for { err := <-o.errCh if err != nil { return err } } } 複製代碼
  • 添加健康檢查、監測等前置處理,略過。重點在於後半部分:
  • 建立service和endpoint的informer,並運行,即經過這兩個informer來及時獲取集羣中service和endpoint資源的變化。
  • go serviceConfig.Run(wait.NeverStop)
  • go endpointsConfig.Run(wait.NeverStop)
  • 調用birthCry方法。這個方法沒什麼特別的,就是記錄一個kube-proxy啓動的事件。
  • 調用SyncLoop方法,持續運行Proxier。
  • s.Proxier.SyncLoop()
  • 具體位置在cmd/kube-proxy/sever.go
// Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors. func (s *ProxyServer) Run() error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) // TODO(vmarmol): Use container config for this. var oomAdjuster *oom.OOMAdjuster if s.OOMScoreAdj != nil { oomAdjuster = oom.NewOOMAdjuster() if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil { klog.V(2).Info(err) } } if s.Broadcaster != nil && s.EventClient != nil { s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")}) } // Start up a healthz server if requested if s.HealthzServer != nil { s.HealthzServer.Run() } // Start up a metrics server if requested if len(s.MetricsBindAddress) > 0 { proxyMux := mux.NewPathRecorderMux("kube-proxy") healthz.InstallHandler(proxyMux) proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", s.ProxyMode) }) proxyMux.Handle("/metrics", prometheus.Handler()) if s.EnableProfiling { routes.Profiling{}.Install(proxyMux) } configz.InstallHandler(proxyMux) go wait.Until(func() { err := http.ListenAndServe(s.MetricsBindAddress, proxyMux) if err != nil { utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err)) } }, 5*time.Second, wait.NeverStop) } // Tune conntrack, if requested // Conntracker is always nil for windows if s.Conntracker != nil { max, err := getConntrackMax(s.ConntrackConfiguration) if err != nil { return err } if max > 0 { err := s.Conntracker.SetMax(max) if err != nil { if err != errReadOnlySysFS { return err } // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000), // the only remediation we know is to restart the docker daemon. // Here we'll send an node event with specific reason and message, the
				// administrator should decide whether and how to handle this issue,
				// whether to drain the node and restart docker.  Occurs in other container runtimes
				// as well.
				// TODO(random-liu): Remove this when the docker bug is fixed.
				const message = "CRI error: /sys is read-only: " +
					"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
				s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
			}
		}

		if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
			timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
			if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
				return err
			}
		}

		if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
			timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
			if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
				return err
			}
		}
	}

	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
		informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
			options.LabelSelector = "!" + apis.LabelServiceProxyName
		}))

	// Create configs (i.e. Watches for Services and Endpoints)
	// Note: RegisterHandler() calls need to happen before creation of Sources because sources
	// only notify on changes, and the initial update (on process start) may be lost if no handlers
	// are registered yet.
	
	// ServiceConfig是kube-proxy中用於監聽service變化的組件,其本質就是informer,進入NewServiceConfig方法可知。
	
	serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
	// EventHandlerd的處理邏輯是由Proxier來處理的
	serviceConfig.RegisterEventHandler(s.Proxier)
	go serviceConfig.Run(wait.NeverStop)


    // endpointsConfig是kube-proxy中用於監聽endpoints變化的組件,其本質就是informer
	endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    //EventHandler的處理邏輯是由Proxier來處理的
	endpointsConfig.RegisterEventHandler(s.Proxier)
	go endpointsConfig.Run(wait.NeverStop)

	// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
	// functions must configure their shared informer event handlers first.
	informerFactory.Start(wait.NeverStop)

	// Birth Cry after the birth is successful
	s.birthCry()

	// Just loop forever for now...
	s.Proxier.SyncLoop()
	return nil
}
複製代碼

2.2.2 NewServiceConfig實現Add/Update/Delete,僅更新事件到Map

  • syncProxyRules執行時機,在service和endpoint的Config剛建立並初始化的時候,以及在service和endpoint發生變化的時候。
  • 參考2.2.1,EventHandlerd的處理邏輯是由Proxier來處理的,經過傳參:serviceConfig.RegisterEventHandler(s.Proxier)
// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
	result := &ServiceConfig{
		listerSynced: serviceInformer.Informer().HasSynced,
	}

	serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    result.handleAddService,
			UpdateFunc: result.handleUpdateService,
			DeleteFunc: result.handleDeleteService,
		},
		resyncPeriod,
	)

	return result
}

// result.handleUpdateService
func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
	oldService, ok := oldObj.(*v1.Service)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
		return
	}
	service, ok := newObj.(*v1.Service)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
		return
	}
	for i := range c.eventHandlers {
		klog.V(4).Info("Calling handler.OnServiceUpdate")
		c.eventHandlers[i].OnServiceUpdate(oldService, service)
	}
}
複製代碼
  • add和delete的回調函數本質上還是執行了update的回調函數,特殊點在於前者爲nil更新爲目標service,後者爲目標service更新爲nil。所以咱們以update爲例。
  • 其回調函數handleUpdateService本質上是調用了Proxier.OnServiceUpdate方法:
  • 方法很短,首先調用了Update方法,成功後調用Run方法。
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
		proxier.syncRunner.Run()
	}
}

// 僅更新事件到Map
// Run the function as soon as possible.  If this is called while Loop is not
// running, the call may be deferred indefinitely.
// If there is already a queued request to call the underlying function, it
// may be dropped - it is just guaranteed that we will try calling the
// underlying function as soon as possible starting from now.
func (bfr *BoundedFrequencyRunner) Run() {
	// If it takes a lot of time to run the underlying function, noone is really
	// processing elements from <run> channel. So to avoid blocking here on the
	// putting element to it, we simply skip it if there is already an element
	// in it.
	select {
	case bfr.run <- struct{}{}:
	default:
	}
}
複製代碼

2.2.2 NewServiceConfig.run

  • syncProxyRules執行時機,在service和endpoint的Config剛建立並初始化的時候,以及在service和endpoint發生變化的時候。
  • 轉到proxier.OnServiceSynced,首先調用setInitialized方法,將proxier初始化。只有通過初始化後,proxier纔會開始調用回調函數。最後,就是執行一次syncProxyRules方法。
  • 總而言之,Run方法是將剛建立好的ServiceConfig初始化,並在初始化後先調用一次syncProxyRules方法。而這個方法,就是kube-proxy維護iptables的具體操做,咱們後面再詳細分析。

2.2.3 Proxier.SyncLoop

  • syncProxyRules執行時機,每隔一段時間會自動執行。
  • 三個處理線程,下面咱們再來看SyncLoop, SyncLoop本質上調用了bounded_frequency_runner.go中的Loop方法:
  • 能夠看到,此方法運行一個無限循環,並定時運行tryRun方法。此外,當bfr的run字段有消息傳入時,也會執行一次tryRun。那麼這個channel何時傳入消息呢?答案就是上一篇文章中提到的,ServiceConfig的回調函數被調用的時候。因此說,每當service發生變化,回調函數被調用時,最終都會執行一次tryRun方法。
  • 全部其餘的代碼,都在爲bfr.fn服務,此方法的核心,就是在合適的時機運行bfr.fn方法。而這一方法,是在建立proxier的時候註冊進去的。回憶一下上一篇的內容,在NewProxier方法中有一行:
pkg/proxy/iptables/proxier.go
    
    func NewProxier(...) (*Proxier, error) {
        
        ......
    
        proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
        return proxier, nil
}
複製代碼

2.2.4 syncProxyRules三種執行時機

  • 能夠看到,這裏將前面提到的proxier.syncProxyRules方法註冊爲了bfr.fn。 因此綜上可知,syncProxyRules方法有三種被執行的途徑,即:
  • 在service和endpoint的Config剛建立並初始化的時候;
  • 在service和endpoint發生變化的時候;
  • 每隔一段時間會自動執行。

2.2.5 syncProxyRules主角登場

  • (1)調用UpdateServiceMap和UpdateEndpointMap方法,執行Service和Endpoint的更新。
  • (2)添加數據鏈表。
  • (3)遍歷全部的service,判斷每一個service的類型,並添加相應的規則。
  • (4)刪除多餘的鏈表,並對iptables進行重構。

  • Create and link the kube chains
for _, chain := range iptablesJumpChains {
        if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
            return
        }
        args := append(chain.extraArgs,
            "-m", "comment", "--comment", chain.comment,
            "-j", string(chain.chain),
        )
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
            return
        }
    }
複製代碼
  • Build rules for each service
svcInfo, ok := svc.(*serviceInfo)
        if !ok {
            klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
            continue
        }
        isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
        protocol := strings.ToLower(string(svcInfo.Protocol))
        svcNameString := svcInfo.serviceNameString
        hasEndpoints := len(proxier.endpointsMap[svcName]) > 0

        svcChain := svcInfo.servicePortChainName
        if hasEndpoints {
            // Create the per-service chain, retaining counters if possible.
            if chain, ok := existingNATChains[svcChain]; ok {
                writeBytesLine(proxier.natChains, chain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
            }
            activeNATChains[svcChain] = true
        }

        ...// Capture the clusterIP.
        if hasEndpoints {
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
                "--dport", strconv.Itoa(svcInfo.Port),
            )
            ...
        } else {
            writeLine(proxier.filterRules,
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
                "--dport", strconv.Itoa(svcInfo.Port),
                "-j", "REJECT",
            )
        }

        // Capture externalIPs.
        for _, externalIP := range svcInfo.ExternalIPs {

            ...

        }
複製代碼
  • Sync rules
proxier.iptablesData.Reset()
    proxier.iptablesData.Write(proxier.filterChains.Bytes())
    proxier.iptablesData.Write(proxier.filterRules.Bytes())
    proxier.iptablesData.Write(proxier.natChains.Bytes())
    proxier.iptablesData.Write(proxier.natRules.Bytes())

    klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
    err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    if err != nil {
        klog.Errorf("Failed to execute iptables-restore: %v", err)
        // Revert new local ports.
        klog.V(2).Infof("Closing local ports after iptables-restore failure")
        utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
        return
    }

複製代碼

3 總結

proxier和syncProxyRules纔是笑到最後,掌握實權的,核心精華就在這裏。linux

專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號,或請轉發郵件至1120746959@qq.com git

相關文章
相關標籤/搜索