kubernetes用戶態TCP代理實現

在k8s中針對service的訪問一般基於kube proxy實現負載均衡,今天咱們來探索下基於用戶態的TCP代理組件的工業級實現核心設計, 其中包括隨機端口生成器、TCP流複製等技術的核心實現前端

1. 基礎築基

今天主要是聊用戶態的轉發,而基於內核態的先不聊算法

1.1 流量重定向

image.png 流量重定向一般是指經過內核的netfilter來對數據包進行攔截,將其定向到咱們指定的端口,實現對流量的劫持,從而針對流量裏面的一些數據包進行一些額外的處理,這個過程對應用來講是徹底透明的後端

1.1.1 目的地址重定向

目的地址重定向是指將針對某個IP或者某個端口的流量,進行重定向,從而實現流量發送的處理,在kube proxy中主要是經過REDIRECT來實現服務器

1.1.2 目標地址轉換

目標地址轉換主要是指針對REDIRECT出去返回的流量,須要作一個重定向操做,即將其地址返回給本地的代理服務,由本地的代理服務再去實現轉發給真正的應用微信

1.2 TCP代理實現

image.png

1.2.1 隨機端口

隨機端口是指咱們要爲爲對應的Service創建一個一個臨時的代理服務器,該代理服務器須要隨機選擇一個本地端口進行監聽網絡

1.2.2 流複製

代理服務器須要將要本地服務發送的數據複製的目標服務, 同時接收目標服務返回的數據,複製給本地服務session

2. 核心設計實現

2.1 隨機端口分配器

image.png

2.1.1 核心數據結構

分配器的核心數據結構主要是經過rand提供隨機數生成器來進行端口的隨機獲取,並經過採用位計數方式來進行端口使用狀態的記錄, 獲取的隨機端口則會放入到ports中提供給進行端口的獲取數據結構

type rangeAllocator struct {
	net.PortRange
	ports chan int	// 保存當前可用的隨機端口
	used  big.Int	// 位計數,記錄端口的使用狀態
	lock  sync.Mutex
	rand  *rand.Rand // 隨機數生成器,生成隨機端口 
}

2.1.2 構建隨機分配器

隨機分配器主要是構建rand隨機數生成器經過當前的時間做爲隨機因子,並構建ports緩衝buffer,當前是16個app

ra := &rangeAllocator{
		PortRange: r,
		ports:     make(chan int, portsBufSize),
		rand:      rand.New(rand.NewSource(time.Now().UnixNano())),
	}

2.1.3 隨機數發生器

隨機數發生器主要是經過調用nextFreePort來進行隨機端口的生成,並放入到ports chan中負載均衡

func (r *rangeAllocator) fillPorts() {
	for {
		// 獲取一個隨機端口
		if !r.fillPortsOnce() {
			return
		}
	}
}

func (r *rangeAllocator) fillPortsOnce() bool {
	port := r.nextFreePort() // 獲取當前隨機端口
	if port == -1 {
		return false
	}
	r.ports <- port	// 將獲取的隨機端口放入到緩衝buffer中
	return true
}

2.1.4 portrange

PortRange是指端口隨機的範圍,支持單個端口、min-max區間、min+offset區間三種設置方式, 經過其構建Base和Size參數,供算法使用

switch notation {
	case SinglePortNotation:
		var port int
		port, err = strconv.Atoi(value)
		if err != nil {
			return err
		}
		low = port
		high = port
	case HyphenNotation:
		low, err = strconv.Atoi(value[:hyphenIndex])
		if err != nil {
			return err
		}
		high, err = strconv.Atoi(value[hyphenIndex+1:])
		if err != nil {
			return err
		}
	case PlusNotation:
		var offset int
		low, err = strconv.Atoi(value[:plusIndex])
		if err != nil {
			return err
		}
		offset, err = strconv.Atoi(value[plusIndex+1:])
		if err != nil {
			return err
		}
		high = low + offset
	default:
		return fmt.Errorf("unable to parse port range: %s", value)
	}

	pr.Base = low
	pr.Size = 1 + high - low

2.1.5 隨機端口生成算法

隨機端口的生成主要是經過先生成隨機數,若是端口已經被使用,則會按照高低兩個區間進行順序搜索,直到找到未被佔用的端口

func (r *rangeAllocator) nextFreePort() int {
	r.lock.Lock()
	defer r.lock.Unlock()

	// 隨機選擇port
	j := r.rand.Intn(r.Size)
	if b := r.used.Bit(j); b == 0 {
		r.used.SetBit(&r.used, j, 1)
		return j + r.Base
	}

	// search sequentially
	// 若是當前端口已經被佔用,則從當前的隨機數順序遞增查找,若是找到就將對應的bit位設置爲1
	for i := j + 1; i < r.Size; i++ {
		if b := r.used.Bit(i); b == 0 {
			r.used.SetBit(&r.used, i, 1)
			return i + r.Base
		}
	}
	// 若是高端端口已經被佔用則會從低地址開始順序查找
	for i := 0; i < j; i++ {
		if b := r.used.Bit(i); b == 0 {
			r.used.SetBit(&r.used, i, 1)
			return i + r.Base
		}
	}
	return -1
}

2.2 TCP代理實現

image.png

2.2.1 核心數據結構

tcp代理實現上基於net.Listener構建一個tcp的監聽器, port則是監聽的地址

type tcpProxySocket struct {
	net.Listener
	port int
}

2.2.2 接收本地重定向流量構建轉發器

tcp代理接收到連接請求,則會accept而後根據Service和loadbalancer算法選擇一臺後端pod來進行連接的創建,而後啓動異步流量複製,實現流向的雙向複製

func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) {
	for {
		if !myInfo.IsAlive() {
			// The service port was closed or replaced.
			return
		}
		// 等待接收連接
		inConn, err := tcp.Accept()
		if err != nil {
			if isTooManyFDsError(err) {
				panic("Accept failed: " + err.Error())
			}

			if isClosedError(err) {
				return
			}
			if !myInfo.IsAlive() {
				// Then the service port was just closed so the accept failure is to be expected.
				return
			}
			klog.Errorf("Accept failed: %v", err)
			continue
		}
		klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
		// 根據目標地址的信息和負載均衡算法來選則後端一臺server進行連接的創建
		outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer)
		if err != nil {
			klog.Errorf("Failed to connect to balancer: %v", err)
			inConn.Close()
			continue
		}
		// 啓動異步流量複製
		go ProxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
	}
}

2.2.3 TCP流量雙向複製

TCP流量雙向複製則是將流量進行雙向的拷貝,經過io.Copy在底層完成從輸入流和輸出流的流量複製

func ProxyTCP(in, out *net.TCPConn) {
	var wg sync.WaitGroup
	wg.Add(2)
	klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
		in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
	// 實現流量想的雙向複製
	go copyBytes("from backend", in, out, &wg)
	go copyBytes("to backend", out, in, &wg)
	wg.Wait()
}

func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
	defer wg.Done()
	klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr())
	// 實現底層的流量的拷貝
	n, err := io.Copy(dest, src)
	if err != nil {
		if !isClosedError(err) {
			klog.Errorf("I/O error: %v", err)
		}
	}
	klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr())
	dest.Close()
	src.Close()
}

2.2.4 經過負載均衡與重試機制實現連接創建

創建連接時有兩個核心的設計即sessionAffinity機制與超時重試機制 親和性機制:若是以前的親和性連接存在依然有效,則會使用,若是發生斷開,則須要重置親和性 超時重試機制:這裏其實能夠採用backoff機制來進行超時重試機制的實現

func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) {
	// 默認是不重置親和性,即以前創建的親和性連接此時依然有效
	sessionAffinityReset := false
	// EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
	// 是一種延遲重試機制,是一種等待重試的機制,減小由於網絡不穩定致使的瞬間重試所有失敗的狀況
	for _, dialTimeout := range EndpointDialTimeouts {
		// 經過負載均衡算法和親和性選擇一臺endpoint來進行連接
		endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
		if err != nil {
			klog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
			return nil, err
		}
		klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
		// 創建底層的連接
		outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
		if err != nil {
			if isTooManyFDsError(err) {
				panic("Dial failed: " + err.Error())
			}
			klog.Errorf("Dial failed: %v", err)
			// 若是發生連接失敗,則以前的親和性則可能失敗,此時就要從新進行選擇節點進行連接
			sessionAffinityReset = true
			continue
		}
		return outConn, nil
	}
	return nil, fmt.Errorf("failed to connect to an endpoint.")
}

好了今天的內容就到這裏,文章核心介紹了一種隨機端口選擇算法的實現,而後剖析了TCP代理的底層實現機制,其核心包括創建連接的親和性、超時重試,以及TCP流量的複製技術的實現,今天就到這裏,但願你們幫忙分享傳播,讓做者有繼續分享寫做的動力,謝謝你們

> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 21天大棚 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索