14.深刻k8s:kube-proxy ipvs及其源碼分析

82062770_p0

轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.comhtml

源碼版本是1.19node

這一篇是講service,可是基礎使用以及基本概念因爲官方實在是寫的比較完整了,我沒有必要複述一遍,因此還不太清楚的小夥伴們能夠去看官方的文檔:https://kubernetes.io/docs/concepts/services-networking/service/。nginx

IPVS 概述

在 Kubernetes 集羣中,每一個 Node 運行一個 kube-proxy 進程。kube-proxy 負責爲 Service 實現了一種 VIP(虛擬 IP)的形式。git

從官方文檔介紹來看:github

從 Kubernetes v1.0 開始,您已經可使用 userspace 代理模式。 Kubernetes v1.1 添加了 iptables 模式代理,在 Kubernetes v1.2 中,kube-proxy 的 iptables 模式成爲默認設置。 Kubernetes v1.8 添加了 ipvs 代理模式。算法

如今咱們看的源碼是基於1.19,因此如今默認是ipvs代理模式。segmentfault

LVS是國內章文嵩博士開發並貢獻給社區的,主要由ipvs和ipvsadm組成。api

ipvs是工做在內核態的4層負載均衡,基於內核底層netfilter實現,netfilter主要經過各個鏈的鉤子實現包處理和轉發。ipvs由ipvsadm提供簡單的CLI接口進行ipvs配置。因爲ipvs工做在內核態,只處理四層協議,所以只能基於路由或者NAT進行數據轉發,能夠把ipvs看成一個特殊的路由器網關,這個網關能夠根據必定的算法自動選擇下一跳。服務器

IPVS vs IPTABLES

  • iptables 使用鏈表,ipvs 使用哈希表;
  • iptables 只支持隨機、輪詢兩種負載均衡算法而 ipvs 支持的多達 8 種;
  • ipvs 還支持 realserver 運行情況檢查、鏈接重試、端口映射、會話保持等功能。

IPVS用法

IPVS能夠經過ipvsadm 命令進行配置,如-L列舉,-A添加,-D刪除。網絡

以下命令建立一個service實例172.17.0.1:32016-t指定監聽的爲TCP端口,-s指定算法爲輪詢算法rr(Round Robin),ipvs支持簡單輪詢(rr)、加權輪詢(wrr)、最少鏈接(lc)、源地址或者目標地址散列(sh、dh)等10種調度算法。

ipvsadm -A -t 172.17.0.1:32016 -s rr

在添加調度算法的時候還須要用-r指定server地址,-w指定權值,-m指定轉發模式,-m設置masquerading表示NAT模式(-g爲gatewaying,即直連路由模式),以下所示:

ipvsadm -a -t 172.17.0.1:32016 -r 10.244.1.2:8080 -m -w 1
ipvsadm -a -t 172.17.0.1:32016 -r 10.244.1.3:8080 -m -w 1
ipvsadm -a -t 172.17.0.1:32016 -r 10.244.3.2:8080 -m -w 1

Service ClusterIP原理

不清楚iptables調用鏈的同窗能夠先看看:https://www.zsythink.net/archives/1199瞭解一下。

咱們這裏使用上一篇HPA的一個例子:

apiVersion: apps/v1 
kind: Deployment
metadata:
  name: hpatest
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hpatest     
  template: 
    metadata:
      labels:
        app: hpatest
    spec:
      containers:
      - name: hpatest
        image: nginx
        imagePullPolicy: IfNotPresent
        command: ["/bin/sh"]
        args: ["-c","/usr/sbin/nginx; while true;do echo `hostname -I` > /usr/share/nginx/html/index.html; sleep 120;done"]
        ports: 
        - containerPort: 80
        resources:
          requests:
            cpu: 1m
            memory: 100Mi
          limits:
            cpu: 3m
            memory: 400Mi  
---
apiVersion: v1
kind: Service
metadata:
  name: hpatest-svc
spec:
  selector:
    app: hpatest
  ports:
  - port: 80
    targetPort: 80
    protocol: TCP

建立好以後,看看svc:

# kubectl get svc
NAME          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
hpatest-svc   ClusterIP   10.68.196.212   <none>        80/TCP     2m44s

而後咱們查看ipvs配置狀況:

# ipvsadm -S -n | grep 10.68.196.212

-A -t 10.68.196.212:80 -s rr
-a -t 10.68.196.212:80 -r 172.20.0.251:80 -m -w 1

-S表示輸出所保存的規則,-n表示以數字的形式輸出ip和端口。能夠看到ipvs的LB IP爲ClusterIP,算法爲rr,RS爲Pod的IP。使用的模式爲NAT模式。

當咱們建立Service以後,kube-proxy 首先會在宿主機上建立一個虛擬網卡(叫做:kube-ipvs0),併爲它分配 Service VIP 做爲 IP 地址,以下所示:

# ip addr show kube-ipvs0
7: kube-ipvs0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN group default
    link/ether 12:bb:85:91:96:4d brd ff:ff:ff:ff:ff:ff
    ...
    inet 10.68.196.212/32 brd 10.68.196.212 scope global kube-ipvs0
       valid_lft forever preferred_lft forever

下面看看ClusterIP如何傳遞的。使用命令:iptables -t nat -nvL能夠看到由不少Chain,ClusterIP訪問方式爲:

PREROUTING --> KUBE-SERVICES --> KUBE-CLUSTER-IP --> INPUT --> KUBE-FIREWALL --> POSTROUTING

當使用命令連接服務時:

curl  10.68.196.212:80

因爲10.96.54.11就在本地,因此會以這個IP做爲出口地址,即源IP和目標IP都是10.96.54.11,此時至關於:

10.68.196.212:xxxx ->  10.68.196.212:80

而後通過ipvs,ipvs會從RS ip列中選擇其中一個Pod ip做爲目標IP:

10.68.196.212:xxxx ->  10.68.196.212:80
                   |
                   | IPVS
                   v
172.20.0.251:xxxx   ->  172.20.0.251:80

查看OUTPUT規則:

# iptables-save 
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A KUBE-SERVICES ! -s 172.20.0.0/16 -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000

如上規則的意思就是除了Pod之外訪問ClusterIP的包都打上0x4000/0x4000

到了POSTROUTING鏈:

-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE

如上規則的意思就是隻要匹配mark0x4000/0x4000的包都作SNAT,因爲172.20.0.251是從flannel.1出去的,所以源ip會改爲flannel.1的ip 172.20.0.0

10.68.196.212:xxxx -> 10.68.196.212:80
                    |
                    | IPVS
                    v
10.68.196.212:xxxx  -> 172.20.0.251:80
                     |
                     | MASQUERADE
                     v
172.20.0.0:xxxx     -> 172.20.0.251:80

最後經過VXLAN隧道發到Pod的Node上,轉發給Pod的veth,回包經過路由到達源Node節點,源Node節點經過以前的MASQUERADE再把目標IP還原爲172.20.0.251。

kube-proxy ipvs 源碼分析

初始化ipvs

文件位置:cmd/kube-proxy/app/server_others.go

kube-proxy啓動的時候會調用NewProxyServer初始化ipvs 代理:

func newProxyServer(
	config *proxyconfigapi.KubeProxyConfiguration,
	cleanupAndExit bool,
	master string) (*ProxyServer, error) {

	...
	//獲取代理模式userspace iptables ipvs
	proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})
	...
	//代理模式是iptables
	if proxyMode == proxyModeIPTables {
		...
	//	代理模式是ipvs
	} else if proxyMode == proxyModeIPVS {
		klog.V(0).Info("Using ipvs Proxier.")
		//判斷是夠啓用了 ipv6 雙棧
		if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
			...
		} else {
			...
			//初始化 ipvs 模式的 proxier
			proxier, err = ipvs.NewProxier(
				...
			)
		}
		...
	} else {
		...
	}

	return &ProxyServer{
		...
	}, nil
}

NewProxyServer方法會根據proxyMode來選擇是IPVS仍是IPTables,ipvs會調用ipvs.NewProxier方法來初始化一個proxier。

NewProxier

func NewProxier(... ) (*Proxier, error) {
	...  
	//對於 SNAT iptables 規則生成 masquerade 標記
	masqueradeValue := 1 << uint(masqueradeBit)
	...
	//設置默認調度算法 rr
	if len(scheduler) == 0 {
		klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
		scheduler = DefaultScheduler
	}
	// healthcheck服務器對象建立
	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)

	...
	//初始化 proxier
	proxier := &Proxier{
		...
	} 
	//初始化 ipset 規則
	proxier.ipsetList = make(map[string]*IPSet)
	for _, is := range ipsetInfo {
		proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
	}
	burstSyncs := 2
	klog.V(2).Infof("ipvs(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d",
		ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs)
	//初始化 syncRunner
	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
	//啓動 gracefuldeleteManager
	proxier.gracefuldeleteManager.Run()
	return proxier, nil
}

這個方法主要作了以下幾件事:

  1. 對於 SNAT iptables 規則生成 masquerade 標記;
  2. 設置默認調度算法 rr;
  3. healthcheck服務器對象建立;
  4. 初始化 proxier;
  5. 初始化 ipset 規則;
  6. 初始化 syncRunner;
  7. 啓動 gracefuldeleteManager;

這個方法在初始化syncRunner的時候設置了proxier.syncProxyRules方法做爲一個參數構建了同步運行器syncRunner。

調用同步運行器

文件位置:cmd/kube-proxy/app/server.go

func (s *ProxyServer) Run() error {
	...
    //調用ipvs的SyncLoop方法
	go s.Proxier.SyncLoop()

	return <-errCh
}

kube-proxy在啓動的時候會初始化完ProxyServer 對象後,會調用runLoop方法,而後調用到ProxyServer的Run方法中,最後調用ipvs的SyncLoop方法。

func (proxier *Proxier) SyncLoop() { 
    ...
    proxier.syncRunner.Loop(wait.NeverStop)     //執行NewBoundedFrequencyRunner對象Loop
}

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
    bfr.timer.Reset(bfr.maxInterval)
    for {
        select {
        case <-stop:
            bfr.stop()
            return
        case <-bfr.timer.C():           //定時器方式執行
            bfr.tryRun()
        case <-bfr.run:                 //按需方式執行(發送運行指令信號)
            bfr.tryRun()
        }
    }
}


func (bfr *BoundedFrequencyRunner) tryRun() {
    bfr.mu.Lock()
    defer bfr.mu.Unlock()

  //限制條件容許運行func
    if bfr.limiter.TryAccept() {
         bfr.fn()                                 // 重點執行部分,調用func,上下文來看此處就是
                                                  // 對syncProxyRules()的調用
        bfr.lastRun = bfr.timer.Now()             // 記錄運行時間
        bfr.timer.Stop()                          
        bfr.timer.Reset(bfr.maxInterval)          // 重設下次運行時間
        klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
        return
    }

  //限制條件不容許運行,計算下次運行時間
  elapsed := bfr.timer.Since(bfr.lastRun)    // elapsed:上次運行時間到如今已過多久
  nextPossible := bfr.minInterval - elapsed  // nextPossible:下次運行至少差多久(最小週期)
  nextScheduled := bfr.maxInterval - elapsed // nextScheduled:下次運行最遲差多久(最大週期)
    klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

    if nextPossible < nextScheduled {
        bfr.timer.Stop()
        bfr.timer.Reset(nextPossible)
        klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
    }
}

SyncLoop方法會調用到proxier的syncRunner實例設置的syncProxyRules方法。

同步運行器

syncProxyRules方法比較長,因此這裏就分開來一步步的講,跟好代碼節奏來就行了。

代碼位置:pkg/proxy/ipvs/proxier.go

//更新 service 與 endpoint變化信息
	serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

	staleServices := serviceUpdateResult.UDPStaleClusterIP 
	// 合併 service 列表
	for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
		if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
			klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
			staleServices.Insert(svcInfo.ClusterIP().String())
			for _, extIP := range svcInfo.ExternalIPStrings() {
				staleServices.Insert(extIP)
			}
		}
	}

這裏是同步與新更service和endpoints,而後 合併 service 列表。

//nat鏈
	proxier.natChains.Reset()
	//nat規則
	proxier.natRules.Reset()
	//filter鏈
	proxier.filterChains.Reset()
	//filter規則
	proxier.filterRules.Reset()

	// Write table headers.
	writeLine(proxier.filterChains, "*filter")
	writeLine(proxier.natChains, "*nat")
	//建立kubernetes的錶鏈接鏈數據
	proxier.createAndLinkeKubeChain()

這裏會重置鏈表規則,而後調用createAndLinkeKubeChain方法建立kubernetes的錶鏈接鏈數據,下面咱們看看createAndLinkeKubeChain方法:

func (proxier *Proxier) createAndLinkeKubeChain() {
	//經過iptables-save獲取現有的filter和NAT表存在的鏈數據
	existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
	existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)

	// Make sure we keep stats for the top-level chains
	//裏面保存了NAT錶鏈和Filter錶鏈
	// NAT錶鏈: KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL
	//          KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ
	// Filter錶鏈: KUBE-FORWARD
	for _, ch := range iptablesChains {
		//不存在則建立鏈,建立頂層鏈
		if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
			klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
			return

		}
		//nat表寫鏈
		if ch.table == utiliptables.TableNAT {
			if chain, ok := existingNATChains[ch.chain]; ok {
				writeBytesLine(proxier.natChains, chain)
			} else {
				writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
			}
		// filter表寫鏈
		} else {
			if chain, ok := existingFilterChains[KubeForwardChain]; ok {
				writeBytesLine(proxier.filterChains, chain)
			} else {
				writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
			}
		}
	}
	// 默認鏈下建立kubernete服務專用跳轉規則
	// iptables -I OUTPUT -t nat --comment "kubernetes service portals" -j KUBE-SERVICES
	// iptables -I PREROUTING -t nat --comment "kubernetes service portals" -j KUBE-SERVICES
	// iptables -I POSTROUTING -t nat --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
	// iptables -I FORWARD -t filter --comment "kubernetes forwarding rules" -j KUBE-FORWARD
	for _, jc := range iptablesJumpChain {
		args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
		if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
			klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
		}
	}

}

createAndLinkeKubeChain方法首先會獲取現存的filter和NAT表,而後再遍歷iptablesChains。

iptablesChains裏面保存了NAT錶鏈和Filter錶鏈:NAT錶鏈 KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ;Filter錶鏈 KUBE-FORWARD;

而後再根據iptablesJumpChain建立跳轉規則。

下面回到syncProxyRules往下走。

// 建立 dummy interface kube-ipvs0
	_, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
	if err != nil {
		klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
		return
	}
 
	// 建立默認的 ipset 規則,http://ipset.netfilter.org/
	for _, set := range proxier.ipsetList {
		if err := ensureIPSet(set); err != nil {
			return
		}
		set.resetEntries()
	}

設置默認Dummy接口,並肯定ipsets規則已存在的集合,ipset相關能夠看:http://ipset.netfilter.org/。

下面會遍歷proxier.serviceMap,對每個服務建立 ipvs 規則,比較長,也分開說。

for svcName, svc := range proxier.serviceMap {
	...
	//基於此服務的有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以備後面生成相應iptables規則(SNAT假裝地址)
	for _, e := range proxier.endpointsMap[svcName] {
		ep, ok := e.(*proxy.BaseEndpointInfo)
		if !ok {
			klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
			continue
		}
		if !ep.IsLocal {
			continue
		}
		epIP := ep.IP()
		epPort, err := ep.Port()
		// Error parsing this endpoint has been logged. Skip to next endpoint.
		if epIP == "" || err != nil {
			continue
		}
		entry := &utilipset.Entry{
			IP:       epIP,
			Port:     epPort,
			Protocol: protocol,
			IP2:      epIP,
			SetType:  utilipset.HashIPPortIP,
		}
		// 校驗KUBE-LOOP-BACK集合entry記錄項
		if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
			klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
			continue
		}
		// 插入此entry記錄至active記錄隊列
		proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
	}
	...
}

這一段是根據有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以備後面生成相應iptables規則(SNAT假裝地址);

for svcName, svc := range proxier.serviceMap {
	...
	//構建ipset entry
	entry := &utilipset.Entry{
		IP:       svcInfo.ClusterIP().String(),
		Port:     svcInfo.Port(),
		Protocol: protocol,
		SetType:  utilipset.HashIPPort,
	}
	// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
	// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
	// 類型校驗ipset entry
	if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
		klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
		continue
	}
	// 名爲KUBE-CLUSTER-IP的ipset集插入entry,以備後面統一輩子成IPtables規則
	proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
	// ipvs call
	// 構建ipvs虛擬服務器VS服務對象
	serv := &utilipvs.VirtualServer{
		Address:   svcInfo.ClusterIP(),
		Port:      uint16(svcInfo.Port()),
		Protocol:  string(svcInfo.Protocol()),
		Scheduler: proxier.ipvsScheduler,
	}
	// Set session affinity flag and timeout for IPVS service
	// 設置IPVS服務的會話保持標誌和超時時間
	if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
		serv.Flags |= utilipvs.FlagPersistent
		serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
	}
	// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
	// 將clusterIP綁定至dummy虛擬接口上,syncService()處理中需置bindAddr地址爲True
	if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
		activeIPVSServices[serv.String()] = true
		activeBindAddrs[serv.Address.String()] = true
		// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
		// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
		//同步endpoints信息,IPVS爲VS更新realServer
		if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
			klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
		}
	} else {
		klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
	}
	...
}

ipset集KUBE-CLUSTER-IP更新,以備後面生成相應iptables規則。

for svcName, svc := range proxier.serviceMap {
	...
	//爲 load-balancer類型建立 ipvs 規則
	for _, ingress := range svcInfo.LoadBalancerIPStrings() {
		if ingress != "" { 
			// 構建ipset entry
			entry = &utilipset.Entry{
				IP:       ingress,
				Port:     svcInfo.Port(),
				Protocol: protocol,
				SetType:  utilipset.HashIPPort,
			} 
			if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
				klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
				continue
			}
			//KUBE-LOAD-BALANCER ipset集更新
			proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) 
			//服務指定externalTrafficPolicy=local時,KUBE-LOAD-BALANCER-LOCAL ipset集更新
			if svcInfo.OnlyNodeLocalEndpoints() {
				if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
					klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
					continue
				}
				proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
			}
			// 服務的LoadBalancerSourceRanges被指定時,基於源IP保護的防火牆策略開啓,KUBE-LOAD-BALANCER-FW ipset集更新
			if len(svcInfo.LoadBalancerSourceRanges()) != 0 { 
				if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
					klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
					continue
				}
				proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
				allowFromNode := false
				for _, src := range svcInfo.LoadBalancerSourceRanges() {
					// ipset call
					entry = &utilipset.Entry{
						IP:       ingress,
						Port:     svcInfo.Port(),
						Protocol: protocol,
						Net:      src,
						SetType:  utilipset.HashIPPortNet,
					} 
					// 枚舉全部源CIDR白名單列表,KUBE-LOAD-BALANCER-SOURCE-CIDR ipset集更新
					//cidr:https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr
					if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
						klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
						continue
					}
					proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())

					// ignore error because it has been validated
					_, cidr, _ := net.ParseCIDR(src)
					if cidr.Contains(proxier.nodeIP) {
						allowFromNode = true
					}
				}
				...
			} 
			// ipvs call
			// 構建ipvs 虛擬主機對象
			serv := &utilipvs.VirtualServer{
				Address:   net.ParseIP(ingress),
				Port:      uint16(svcInfo.Port()),
				Protocol:  string(svcInfo.Protocol()),
				Scheduler: proxier.ipvsScheduler,
			}
			...
		}
	}
	...
}

這裏爲 load-balancer類型建立 ipvs 規則,LoadBalancerSourceRanges和externalTrafficPolicy=local被指定時將對KUBE-LOAD-BALANCER-LOCAL、KUBE-LOAD-BALANCER-FW、KUBE-LOAD-BALANCER-SOURCE-CIDR、KUBE-LOAD-BALANCER-SOURCE-IP ipset集更新,以備後面生成相應iptables規則。

...
	//同步 ipset 記錄,清理 conntrack
	for _, set := range proxier.ipsetList {
		set.syncIPSetEntries()
	}
 
	//建立 iptables 規則數據
	proxier.writeIptablesRules()
	// 合併iptables規則
	proxier.iptablesData.Reset()
	proxier.iptablesData.Write(proxier.natChains.Bytes())
	proxier.iptablesData.Write(proxier.natRules.Bytes())
	proxier.iptablesData.Write(proxier.filterChains.Bytes())
	proxier.iptablesData.Write(proxier.filterRules.Bytes())

	klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
	//基於iptables格式化規則數據,使用iptables-restore刷新iptables規則
	err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
	...

最後這裏會刷新iptables 規則,而後建立 iptables 規則數據,將合併iptables規則,iptables-restore刷新iptables規則。

總結

這一篇沒有怎麼講service是怎麼運行的,怎麼使用的,而是選擇講了kube-proxy的ipvs代理是怎麼作的,以及在開頭講了ipvs與iptables區別與關係,看不懂的同窗須要本身去補充一下iptables相關的知識,文中的 ipvs 的知識我也是現學的,若是有講解很差的地方歡迎指出。

Reference

https://kubernetes.io/docs/tasks/administer-cluster/dns-debugging-resolution/

https://kubernetes.io/docs/tasks/debug-application-cluster/debug-service/

https://kubernetes.io/docs/concepts/services-networking/service/

https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr

https://github.com/kubernetes/kubernetes/tree/master/pkg/proxy/ipvs

https://zh.wikipedia.org/zh-cn/網絡地址轉換

http://www.javashuo.com/article/p-cqcdrbga-kc.html

http://ipset.netfilter.org/

相關文章
相關標籤/搜索