在k8s中針對service的訪問一般基於kube proxy實現負載均衡,今天咱們來探索下基於用戶態的TCP代理組件的工業級實現核心設計, 其中包括隨機端口生成器、TCP流複製等技術的核心實現前端
今天主要是聊用戶態的轉發,而基於內核態的先不聊算法
流量重定向一般是指經過內核的netfilter來對數據包進行攔截,將其定向到咱們指定的端口,實現對流量的劫持,從而針對流量裏面的一些數據包進行一些額外的處理,這個過程對應用來講是徹底透明的後端
目的地址重定向是指將針對某個IP或者某個端口的流量,進行重定向,從而實現流量發送的處理,在kube proxy中主要是經過REDIRECT來實現服務器
目標地址轉換主要是指針對REDIRECT出去返回的流量,須要作一個重定向操做,即將其地址返回給本地的代理服務,由本地的代理服務再去實現轉發給真正的應用微信
隨機端口是指咱們要爲爲對應的Service創建一個一個臨時的代理服務器,該代理服務器須要隨機選擇一個本地端口進行監聽網絡
代理服務器須要將要本地服務發送的數據複製的目標服務, 同時接收目標服務返回的數據,複製給本地服務session
分配器的核心數據結構主要是經過rand提供隨機數生成器來進行端口的隨機獲取,並經過採用位計數方式來進行端口使用狀態的記錄, 獲取的隨機端口則會放入到ports中提供給進行端口的獲取數據結構
type rangeAllocator struct { net.PortRange ports chan int // 保存當前可用的隨機端口 used big.Int // 位計數,記錄端口的使用狀態 lock sync.Mutex rand *rand.Rand // 隨機數生成器,生成隨機端口 }
隨機分配器主要是構建rand隨機數生成器經過當前的時間做爲隨機因子,並構建ports緩衝buffer,當前是16個app
ra := &rangeAllocator{ PortRange: r, ports: make(chan int, portsBufSize), rand: rand.New(rand.NewSource(time.Now().UnixNano())), }
隨機數發生器主要是經過調用nextFreePort來進行隨機端口的生成,並放入到ports chan中負載均衡
func (r *rangeAllocator) fillPorts() { for { // 獲取一個隨機端口 if !r.fillPortsOnce() { return } } } func (r *rangeAllocator) fillPortsOnce() bool { port := r.nextFreePort() // 獲取當前隨機端口 if port == -1 { return false } r.ports <- port // 將獲取的隨機端口放入到緩衝buffer中 return true }
PortRange是指端口隨機的範圍,支持單個端口、min-max區間、min+offset區間三種設置方式, 經過其構建Base和Size參數,供算法使用
switch notation { case SinglePortNotation: var port int port, err = strconv.Atoi(value) if err != nil { return err } low = port high = port case HyphenNotation: low, err = strconv.Atoi(value[:hyphenIndex]) if err != nil { return err } high, err = strconv.Atoi(value[hyphenIndex+1:]) if err != nil { return err } case PlusNotation: var offset int low, err = strconv.Atoi(value[:plusIndex]) if err != nil { return err } offset, err = strconv.Atoi(value[plusIndex+1:]) if err != nil { return err } high = low + offset default: return fmt.Errorf("unable to parse port range: %s", value) } pr.Base = low pr.Size = 1 + high - low
隨機端口的生成主要是經過先生成隨機數,若是端口已經被使用,則會按照高低兩個區間進行順序搜索,直到找到未被佔用的端口
func (r *rangeAllocator) nextFreePort() int { r.lock.Lock() defer r.lock.Unlock() // 隨機選擇port j := r.rand.Intn(r.Size) if b := r.used.Bit(j); b == 0 { r.used.SetBit(&r.used, j, 1) return j + r.Base } // search sequentially // 若是當前端口已經被佔用,則從當前的隨機數順序遞增查找,若是找到就將對應的bit位設置爲1 for i := j + 1; i < r.Size; i++ { if b := r.used.Bit(i); b == 0 { r.used.SetBit(&r.used, i, 1) return i + r.Base } } // 若是高端端口已經被佔用則會從低地址開始順序查找 for i := 0; i < j; i++ { if b := r.used.Bit(i); b == 0 { r.used.SetBit(&r.used, i, 1) return i + r.Base } } return -1 }
tcp代理實現上基於net.Listener構建一個tcp的監聽器, port則是監聽的地址
type tcpProxySocket struct { net.Listener port int }
tcp代理接收到連接請求,則會accept而後根據Service和loadbalancer算法選擇一臺後端pod來進行連接的創建,而後啓動異步流量複製,實現流向的雙向複製
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) { for { if !myInfo.IsAlive() { // The service port was closed or replaced. return } // 等待接收連接 inConn, err := tcp.Accept() if err != nil { if isTooManyFDsError(err) { panic("Accept failed: " + err.Error()) } if isClosedError(err) { return } if !myInfo.IsAlive() { // Then the service port was just closed so the accept failure is to be expected. return } klog.Errorf("Accept failed: %v", err) continue } klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) // 根據目標地址的信息和負載均衡算法來選則後端一臺server進行連接的創建 outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) if err != nil { klog.Errorf("Failed to connect to balancer: %v", err) inConn.Close() continue } // 啓動異步流量複製 go ProxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) } }
TCP流量雙向複製則是將流量進行雙向的拷貝,經過io.Copy在底層完成從輸入流和輸出流的流量複製
func ProxyTCP(in, out *net.TCPConn) { var wg sync.WaitGroup wg.Add(2) klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) // 實現流量想的雙向複製 go copyBytes("from backend", in, out, &wg) go copyBytes("to backend", out, in, &wg) wg.Wait() } func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { defer wg.Done() klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) // 實現底層的流量的拷貝 n, err := io.Copy(dest, src) if err != nil { if !isClosedError(err) { klog.Errorf("I/O error: %v", err) } } klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) dest.Close() src.Close() }
創建連接時有兩個核心的設計即sessionAffinity機制與超時重試機制 親和性機制:若是以前的親和性連接存在依然有效,則會使用,若是發生斷開,則須要重置親和性 超時重試機制:這裏其實能夠採用backoff機制來進行超時重試機制的實現
func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) { // 默認是不重置親和性,即以前創建的親和性連接此時依然有效 sessionAffinityReset := false // EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} // 是一種延遲重試機制,是一種等待重試的機制,減小由於網絡不穩定致使的瞬間重試所有失敗的狀況 for _, dialTimeout := range EndpointDialTimeouts { // 經過負載均衡算法和親和性選擇一臺endpoint來進行連接 endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { klog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err } klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) // 創建底層的連接 outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout) if err != nil { if isTooManyFDsError(err) { panic("Dial failed: " + err.Error()) } klog.Errorf("Dial failed: %v", err) // 若是發生連接失敗,則以前的親和性則可能失敗,此時就要從新進行選擇節點進行連接 sessionAffinityReset = true continue } return outConn, nil } return nil, fmt.Errorf("failed to connect to an endpoint.") }
好了今天的內容就到這裏,文章核心介紹了一種隨機端口選擇算法的實現,而後剖析了TCP代理的底層實現機制,其核心包括創建連接的親和性、超時重試,以及TCP流量的複製技術的實現,今天就到這裏,但願你們幫忙分享傳播,讓做者有繼續分享寫做的動力,謝謝你們
> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈