李騰飛,騰訊容器技術研發工程師,騰訊雲TKE後臺研發,SuperEdge核心開發成員。node
杜楊浩,騰訊雲高級工程師,熱衷於開源、容器和Kubernetes。目前主要從事鏡像倉庫,Kubernetes集羣高可用&備份還原,以及邊緣計算相關研發工做。git
SuperEdge 是 Kubernetes 原生的邊緣容器方案,它將 Kubernetes 強大的容器管理能力擴展到邊緣計算場景中,針對邊緣計算場景中常見的技術挑戰提供瞭解決方案,如:單集羣節點跨地域、雲邊網絡不可靠、邊緣節點位於 NAT 網絡等。這些能力可讓應用很容易地部署到邊緣計算節點上,而且可靠地運行,能夠幫助您很方便地把分佈在各處的計算資源放到一個 Kubernetes 集羣中管理,包括但不限於:邊緣雲計算資源、私有云資源、現場設備,打造屬於您的邊緣 PaaS 平臺。SuperEdge 支持全部 Kubernetes 資源類型、API 接口、使用方式、運維工具,無額外的學習成本,也兼容其餘雲原生項目,如:Promethues,使用者能夠結合其餘所需的雲原生項目一塊兒使用。項目由如下公司共同發起:騰訊、Intel、VMware、虎牙直播、寒武紀、首都在線和美團。github
在邊緣場景中,不少時候都是單向網絡,即只有邊緣節點能主動訪問雲端。雲邊隧道主要用於代理雲端訪問邊緣節點組件的請求,解決雲端沒法直接訪問邊緣節點的問題。api
架構圖以下所示:網絡
實現原理爲:架構
而整個請求的代理轉發流程以下:併發
在介紹完 Tunnel 的配置後,下面介紹 Tunnel 的內部數據流轉:app
上圖標記出了 HTTPS 代理的數據流轉,TCP 代理數據流轉和 HTTPS 的相似,其中的關鍵步驟:負載均衡
nodeContext 和 connContext 都是作鏈接的管理,可是 nodeContext 管理 gRPC 長鏈接的和 connContext 管理的上層轉發請求的鏈接(TCP 和 HTTPS)的生命週期是不相同的,所以須要分開管理運維
Tunnel 管理的鏈接能夠分爲底層鏈接(雲端隧道的 gRPC 鏈接)和上層應用鏈接(HTTPS 鏈接和 TCP 鏈接),鏈接異常的管理的能夠分爲如下幾種場景:
以 HTTPS 鏈接爲例,tunnel-edge 的 HTTPS Client 與邊緣節點 Server 鏈接異常斷開,會發送 StreamMsg (StreamMsg.Type=CLOSE) 消息,tunnel-cloud 在接收到 StreamMsg 消息以後會主動關閉 HTTPS Server與HTTPS Client 的鏈接。
gRPC 鏈接異常,Stream 模塊會根據與 gPRC 鏈接綁定的 node.connContext,向 HTTPS 和 TCP 模塊發送 StreamMsg(StreamMsg.Type=CLOSE),HTTPS 或 TCP 模塊接收消息以後主動斷開鏈接。
func (stream *Stream) Start(mode string) { context.GetContext().RegisterHandler(util.STREAM_HEART_BEAT, util.STREAM, streammsg.HeartbeatHandler) if mode == util.CLOUD { ... //啓動gRPC server go connect.StartServer() ... //同步coredns的hosts插件的配置文件 go connect.SynCorefile() } else { //啓動gRPC client go connect.StartSendClient() ... } ... }
tunnel-cloud 首先調用 RegisterHandler 註冊心跳消息處理函數 HeartbeatHandler
SynCorefile 執行同步 tunnel-coredns 的 hosts 插件的配置文件,每隔一分鐘(考慮到 configmap 同步 tunnel-cloud 的 pod 掛載文件的時間)執行一次 checkHosts,以下:
func SynCorefile() { for { ... err := coreDns.checkHosts() ... time.Sleep(60 * time.Second) } }
而 checkHosts 負責 configmap 具體的刷新操做:
func (dns *CoreDns) checkHosts() error { nodes, flag := parseHosts() if !flag { return nil } ... _, err = dns.ClientSet.CoreV1().ConfigMaps(dns.Namespace).Update(cctx.TODO(), cm, metav1.UpdateOptions{}) ... }
checkHosts 首先調用 parseHosts 獲取本地 hosts 文件中邊緣節點名稱以及對應 tunnel-cloud podIp 映射列表,對比 podIp 的對應節點名和內存中節點名,若是有變化則將這個內容覆蓋寫入 configmap 並更新:
另外,這裏 tunnel-cloud 引入 configmap 本地掛載文件的目的是:優化託管模式下衆多集羣同時同步 tunnel-coredns 時的性能
tunnel-edge 首先調用 StartClient 與 tunnel-edge 創建 gRPC 鏈接,返回 grpc.ClientConn
func StartClient() (*grpc.ClientConn, ctx.Context, ctx.CancelFunc, error) { ... opts := []grpc.DialOption{grpc.WithKeepaliveParams(kacp), grpc.WithStreamInterceptor(ClientStreamInterceptor), grpc.WithTransportCredentials(creds)} conn, err := grpc.Dial(conf.TunnelConf.TunnlMode.EDGE.StreamEdge.Client.ServerName, opts...) ... }
在調用 grpc.Dial 時會傳遞grpc.WithStreamInterceptor(ClientStreamInterceptor)
DialOption,將 ClientStreamInterceptor 做爲 StreamClientInterceptor 傳遞給 grpc.ClientConn,等待 gRPC 鏈接狀態變爲 Ready,而後執行 Send 函數。streamClient.TunnelStreaming 調用StreamClientInterceptor 返回 wrappedClientStream 對象
func ClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { ... opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{AccessToken: clientToken}))) ... return newClientWrappedStream(s), nil }
ClientStreamInterceptor 會將邊緣節點名稱以及 token 構形成 oauth2.Token.AccessToken 進行認證傳遞,並構建 wrappedClientStream
stream.Send 會併發調用 wrappedClientStream.SendMsg 以及 wrappedClientStream.RecvMsg 分別用於 tunnel-edge 發送以及接受,並阻塞等待
注意:tunnel-edge 向 tunnel-cloud 註冊節點信息是在創建 gRPC Stream 時,而不是建立 grpc.connClient 的時候
整個過程以下圖所示:
相應的,在初始化 tunnel-cloud 時,會將grpc.StreamInterceptor(ServerStreamInterceptor)
構建成 gRPC ServerOption,並將 ServerStreamInterceptor 做爲 StreamServerInterceptor 傳遞給 grpc.Server:
func StartServer() { ... opts := []grpc.ServerOption{grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.StreamInterceptor(ServerStreamInterceptor), grpc.Creds(creds)} s := grpc.NewServer(opts...) proto.RegisterStreamServer(s, &stream.Server{}) ... }
雲端 gRPC 服務在接受到 tunnel-edge 請求(創建 Stream 流)時,會調用 ServerStreamInterceptor,而 ServerStreamInterceptor 會從gRPC metadata 中解析出此 gRPC 鏈接對應的邊緣節點名和token,並對該 token 進行校驗,而後根據節點名構建 wrappedServerStream 做爲與該邊緣節點通訊的處理對象(每一個邊緣節點對應一個處理對象),handler 函數會調用 stream.TunnelStreaming,並將 wrappedServerStream 傳遞給它(wrappedServerStream 實現了proto.Stream_TunnelStreamingServer 接口)
func ServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { md, ok := metadata.FromIncomingContext(ss.Context()) ... tk := strings.TrimPrefix(md["authorization"][0], "Bearer ") auth, err := token.ParseToken(tk) ... if auth.Token != token.GetTokenFromCache(auth.NodeName) { klog.Errorf("invalid token node = %s", auth.NodeName) return ErrInvalidToken } err = handler(srv, newServerWrappedStream(ss, auth.NodeName)) if err != nil { ctx.GetContext().RemoveNode(auth.NodeName) klog.Errorf("node disconnected node = %s err = %v", auth.NodeName, err) } return err }
而當 TunnelStreaming 方法退出時,就會執 ServerStreamInterceptor 移除節點的邏輯ctx.GetContext().RemoveNode
TunnelStreaming 會併發調用 wrappedServerStream.SendMsg 以及 wrappedServerStream.RecvMsg 分別用於 tunnel-cloud 發送以及接受,並阻塞等待:
func (s *Server) TunnelStreaming(stream proto.Stream_TunnelStreamingServer) error { errChan := make(chan error, 2) go func(sendStream proto.Stream_TunnelStreamingServer, sendChan chan error) { sendErr := sendStream.SendMsg(nil) ... sendChan <- sendErr }(stream, errChan) go func(recvStream proto.Stream_TunnelStreamingServer, recvChan chan error) { recvErr := stream.RecvMsg(nil) ... recvChan <- recvErr }(stream, errChan) e := <-errChan return e }
SendMsg 會從 wrappedServerStream 對應邊緣節點 node 中接受 StreamMsg,並調用 ServerStream.SendMsg 發送該消息給 tunnel-edge
func (w *wrappedServerStream) SendMsg(m interface{}) error { if m != nil { return w.ServerStream.SendMsg(m) } node := ctx.GetContext().AddNode(w.node) ... for { msg := <-node.NodeRecv() ... err := w.ServerStream.SendMsg(msg) ... }}
而 RecvMsg 會不斷接受來自 tunnel-edge 的 StreamMsg,並調用 StreamMsg.對應的處理函數進行操做
HTTPS 模塊負責創建雲邊的 HTTPS 代理,將雲端組件(例如:kube-apiserver)的 https 請求轉發給邊端服務(例如:kubelet)
func (https *Https) Start(mode string) { context.GetContext().RegisterHandler(util.CONNECTING, util.HTTPS, httpsmsg.ConnectingHandler) context.GetContext().RegisterHandler(util.CONNECTED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.CLOSED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.TRANSNMISSION, util.HTTPS, httpsmsg.ConnectedAndTransmission) if mode == util.CLOUD { go httpsmng.StartServer() }}
Start 函數首先註冊了 StreamMsg 的處理函數,其中 CLOSED 處理函數主要處理關閉鏈接的消息,並啓動 HTTPS Server。
當雲端組件向 tunnel-cloud 發送 HTTPS 請求時,serverHandler 會首先從 request.Host 字段解析節點名,若先創建 TLS 鏈接,而後在鏈接中寫入 HTTP 的 request 對象,此時的 request.Host 能夠不設置,則須要從 request.TLS.ServerName 解析節點名。HTTPS Server 讀取 request.Body 以及 request.Header 構建 HttpsMsg 結構體,並序列化後封裝成 StreamMsg,經過 Send2Node 發送 StreamMsg 放入 StreamMsg.node 對應的 node 的 Channel 中,由 Stream 模塊發送到 tunnel-edge
func (serverHandler *ServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { var nodeName string nodeinfo := strings.Split(request.Host, ":") if context.GetContext().NodeIsExist(nodeinfo[0]) { nodeName = nodeinfo[0] } else { nodeName = request.TLS.ServerName } ... node.Send2Node(StreamMsg)}
tunnel-edge 接受到 StreamMsg,並調用 ConnectingHandler 函數進行處理:
func ConnectingHandler(msg *proto.StreamMsg) error { go httpsmng.Request(msg) return nil}func Request(msg *proto.StreamMsg) { httpConn, err := getHttpConn(msg) ... rawResponse := bytes.NewBuffer(make([]byte, 0, util.MaxResponseSize)) rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader(httpConn, rawResponse)) resp, err := http.ReadResponse(respReader, nil) ... node.BindNode(msg.Topic) ... if resp.StatusCode != http.StatusSwitchingProtocols { handleClientHttp(resp, rawResponse, httpConn, msg, node, conn) } else { handleClientSwitchingProtocols(httpConn, rawResponse, msg, node, conn) }}
ConnectingHandler 會調用 Request 對該 StreamMsg 進行處理。Reqeust 首先經過 getHttpConn 與邊緣節點 Server 創建的 TLS 鏈接。解析 TLS 鏈接中返回的數據獲取 HTTP Response,Status Code 爲200,將 Response 的內容發送到 tunnel-cloud,Status Code 爲101,將從TLS 鏈接讀取 Response 的二進制數據發送到 tunnel-cloud,其中 StreamMsg.Type爲CONNECTED。
tunnel-cloud 在接受到該 StreamMsg 後,會調用 ConnectedAndTransmission 進行處理:
func ConnectedAndTransmission(msg *proto.StreamMsg) error { conn := context.GetContext().GetConn(msg.Topic) ... conn.Send2Conn(msg) return nil}
經過 msg.Topic(conn uid) 獲取 conn,並經過 Send2Conn 將消息塞到該 conn 對應的管道中
雲端 HTTPS Server 在接受到雲端的 CONNECTED 消息以後,認爲HTTPS 代理成功創建。並繼續執行 handleClientHttp or handleClientSwitchingProtocols 進行數據傳輸,這裏只分析 handleClientHttp 非協議提高下的數據傳輸過程,HTTPS Client 端的處理邏輯以下:
func handleClientHttp(resp *http.Response, rawResponse *bytes.Buffer, httpConn net.Conn, msg *proto.StreamMsg, node context.Node, conn context.Conn) { ... go func(read chan *proto.StreamMsg, response *http.Response, buf *bytes.Buffer, stopRead chan struct{}) { rrunning := true for rrunning { bbody := make([]byte, util.MaxResponseSize) n, err := response.Body.Read(bbody) respMsg := &proto.StreamMsg{ Node: msg.Node, Category: msg.Category, Type: util.CONNECTED, Topic: msg.Topic, Data: bbody[:n], } ... read <- respMsg } ... }(readCh, resp, rawResponse, stop) running := true for running { select { case cloudMsg := <-conn.ConnRecv(): ... case respMsg := <-readCh: ... node.Send2Node(respMsg) ... } } ...}
這裏 handleClientHttp 會一直嘗試讀取來自邊端組件的數據包,並構建成 TRANSNMISSION 類型的 StreamMsg 發送給 tunnel-cloud,tunnel-cloud 在接受到StreamMsg 後調用 ConnectedAndTransmission 函數,將 StreamMsg 放入 StreamMsg.Type 對應的 HTTPS 模塊的 conn.Channel 中
func handleServerHttp(rmsg *HttpsMsg, writer http.ResponseWriter, request *http.Request, node context.Node, conn context.Conn) { for k, v := range rmsg.Header { writer.Header().Add(k, v) } flusher, ok := writer.(http.Flusher) if ok { running := true for running { select { case <-request.Context().Done(): ... case msg := <-conn.ConnRecv(): ... _, err := writer.Write(msg.Data) flusher.Flush() ... } } ...}
handleServerHttp 在接受到 StreamMsg 後,會將 msg.Data,也即邊端組件的數據包,發送給雲端組件。整個數據流是單向的由邊端向雲端傳送,以下所示:
而對於相似kubectl exec
的請求,數據流是雙向的,此時邊端組件 (kubelet) 會返回 StatusCode 爲101的回包,標示協議提高,以後 tunnel-cloud 以及 tunnel-edge 會分別切到 handleServerSwitchingProtocols 以及 handleClientSwitchingProtocols 對 HTTPS 底層鏈接進行讀取和寫入,完成數據流的雙向傳輸。
架構以下所示:
總結HTTPS模塊以下:
TCP 模塊負責在多集羣管理中創建雲端管控集羣與邊緣獨立集羣的一條 TCP 代理隧道:
func (tcp *TcpProxy) Start(mode string) { context.GetContext().RegisterHandler(util.TCP_BACKEND, tcp.Name(), tcpmsg.BackendHandler) context.GetContext().RegisterHandler(util.TCP_FRONTEND, tcp.Name(), tcpmsg.FrontendHandler) context.GetContext().RegisterHandler(util.CLOSED, tcp.Name(), tcpmsg.ControlHandler) if mode == util.CLOUD { ... for front, backend := range Tcp.Addr { go func(front, backend string) { ln, err := net.Listen("tcp", front) ... for { rawConn, err := ln.Accept() .... fp := tcpmng.NewTcpConn(uuid, backend, node) fp.Conn = rawConn fp.Type = util.TCP_FRONTEND go fp.Write() go fp.Read() } }(front, backend) } }
Start 函數首先註冊了 StreamMsg 的處理函數,其中 CLOSED 處理函數主要處理關閉鏈接的消息,以後在雲端啓動 TCP Server。
在接受到雲端組件的請求後,TCP Server 會將請求封裝成 StremMsg 發送給 StreamServer,由 StreamServer 發送到 tunnel-edge,其中 StreamMsg.Type=FrontendHandler,StreamMsg.Node 從已創建的雲邊隧道的節點中隨機選擇一個。
tunnel-edge 在接受到該StreamMsg 後,會調用 FrontendHandler 函數處理
func FrontendHandler(msg *proto.StreamMsg) error { c := context.GetContext().GetConn(msg.Topic) if c != nil { c.Send2Conn(msg) return nil } tp := tcpmng.NewTcpConn(msg.Topic, msg.Addr, msg.Node) tp.Type = util.TCP_BACKEND tp.C.Send2Conn(msg) tcpAddr, err := net.ResolveTCPAddr("tcp", tp.Addr) if err != nil { ... conn, err := net.DialTCP("tcp", nil, tcpAddr) ... tp.Conn = conn go tp.Read() go tp.Write() return nil}
FrontendHandler 首先使用 StreamMsg.Addr 與 Edge Server 創建 TCP 鏈接,啓動協程異步對 TCP 鏈接 Read 和 Write,同時新建 conn 對象(conn.uid=StreamMsg.Topic),並 eamMsg.Data 寫入 TCP 鏈接。tunnel-edge 在接收到 Edge Server 的返回數據將其封裝爲 StreamMsg(StreamMsg.Topic=BackendHandler) 發送到 tunnel-cloud
整個過程如圖所示:
【騰訊雲原生】雲說新品、雲研新術、雲遊新活、雲賞資訊,掃碼關注同名公衆號,及時獲取更多幹貨!!
![]()