做者 | 何淋波(新勝)
來源 | 阿里巴巴雲原生公衆號git
阿里雲邊緣容器服務上線 1 年後,正式開源了雲原生邊緣計算解決方案 OpenYurt,跟其餘開源的容器化邊緣計算方案不一樣的地方在於:OpenYurt 秉持 Extending your native Kubernetes to edge 的理念,對 Kubernetes 系統零修改,並提供一鍵式轉換原生 Kubernetes 爲 OpenYurt,讓原生 K8s 集羣具有邊緣集羣能力。github
同時隨着 OpenYurt 的持續演進,也必定會繼續保持以下發展理念:緩存
想要實現將 Kubernetes 系統延展到邊緣計算場景,那麼邊緣節點將經過公網和雲端鏈接,網絡鏈接有很大不可控因素,可能帶來邊緣業務運行的不穩定因素,這是雲原生和邊緣計算融合的主要難點之一。網絡
解決這個問題,須要使邊緣側具備自治能力,即當雲邊網絡斷開或者鏈接不穩定時,確保邊緣業務能夠持續運行。在 OpenYurt 中,該能力由 yurt-controller-manager 和 YurtHub 組件提供。架構
在以前的文章中,咱們詳細介紹了 YurtHub 組件的能力。其架構圖以下:app
圖片連接運維
YurtHub 是一個帶有數據緩存功能的「透明網關」,和雲端網絡斷連狀態下,若是節點或者組件重啓,各個組件(kubelet/kube-proxy 等)將從 YurtHub 中獲取到業務容器相關數據,有效解決邊緣自治的問題。這也意味着咱們須要實現一個輕量的帶數據緩存能力的反向代理。ide
實現一個緩存數據的反向代理,第一想法就是從 response.Body 中讀取數據,而後分別返回給請求 client 和本地的 Cache 模塊。僞代碼以下:阿里雲
func HandleResponse(rw http.ResponseWriter, resp *http.Response) { bodyBytes, _ := ioutil.ReadAll(resp.Body) go func() { // cache response on local disk cacher.Write(bodyBytes) } // client reads data from response rw.Write(bodyBytes) }
當深刻思考後,在 Kubernetes 系統中,上述實現會引起下面的問題:代理
問題 1:流式數據須要如何處理(如: K8s 中的 watch 請求),意味 ioutil.ReadAll() 一次調用沒法返回全部數據。即如何能夠返回流數據同時又緩存流數據。
針對上面的問題,咱們將問題逐個抽象,能夠發現更優雅的實現方法。
針對流式數據的讀寫(一邊返回一邊緩存),以下圖所示,其實須要的不過是把 response.Body(io.Reader) 轉換成一個 io.Reader 和一個 io.Writer。或者說是一個 io.Reader 和 io.Writer 合成一個 io.Reader。這很容易就聯想到 Linux 裏面的 Tee 命令。
而在 Golang 中 Tee 命令是實現就是io.TeeReader,那問題 1 的僞代碼以下:
func HandleResponse(rw http.ResponseWriter, resp *http.Response) { // create TeeReader with response.Body and cacher newRespBody := io.TeeReader(resp.Body, cacher) // client reads data from response io.Copy(rw, newRespBody) }
經過 TeeReader 的對 Response.Body 和 Cacher 的整合,當請求 client 端從 response.Body 中讀取數據時,將同時向 Cache 中寫入返回數據,優雅的解決了流式數據的處理。
以下圖所示,緩存前先清洗流數據,請求端和過濾端須要同時讀取 response.Body(2 次讀取問題)。也就是須要將 response.Body(io.Reader) 轉換成兩個 io.Reader。
也意味着問題 2 轉化成:問題 1 中緩存端的 io.Writer 轉換成 Data Filter 的 io.Reader。其實在 Linux 命令中也能找到相似命令,就是管道。所以問題 2 的僞代碼以下:
func HandleResponse(rw http.ResponseWriter, resp *http.Response) { pr, pw := io.Pipe() // create TeeReader with response.Body and Pipe writer newRespBody := io.TeeReader(resp.Body, pw) go func() { // filter reads data from response io.Copy(dataFilter, pr) } // client reads data from response io.Copy(rw, newRespBody) }
經過 io.TeeReader 和 io.PiPe,當請求 client 端從 response.Body 中讀取數據時,Filter 將同時從 Response 讀取到數據,優雅的解決了流式數據的 2 次讀取問題。
最後看一下 YurtHub 中相關實現,因爲 Response.Body 爲 io.ReadCloser,因此實現了 dualReadCloser。同時 YurtHub 可能也面臨對 http.Request 的緩存,因此增長了 isRespBody 參數用於斷定是否須要負責關閉 response.Body。
// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/util/util.go#L156 // NewDualReadCloser create an dualReadCloser object func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) { pr, pw := io.Pipe() dr := &dualReadCloser{ rc: rc, pw: pw, isRespBody: isRespBody, } return dr, pr } type dualReadCloser struct { rc io.ReadCloser pw *io.PipeWriter // isRespBody shows rc(is.ReadCloser) is a response.Body // or not(maybe a request.Body). if it is true(it's a response.Body), // we should close the response body in Close func, else not, // it(request body) will be closed by http request caller isRespBody bool } // Read read data into p and write into pipe func (dr *dualReadCloser) Read(p []byte) (n int, err error) { n, err = dr.rc.Read(p) if n > 0 { if n, err := dr.pw.Write(p[:n]); err != nil { klog.Errorf("dualReader: failed to write %v", err) return n, err } } return } // Close close two readers func (dr *dualReadCloser) Close() error { errs := make([]error, 0) if dr.isRespBody { if err := dr.rc.Close(); err != nil { errs = append(errs, err) } } if err := dr.pw.Close(); err != nil { errs = append(errs, err) } if len(errs) != 0 { return fmt.Errorf("failed to close dualReader, %v", errs) } return nil }
在使用 dualReadCloser 時,能夠在httputil.NewSingleHostReverseProxy的modifyResponse()方法中看到。代碼以下:
// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/proxy/remote/remote.go#L85 func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {rambohe-ch, 10 months ago: • hello openyurt // 省略部分前置檢查 rc, prc := util.NewDualReadCloser(resp.Body, true) go func(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) { err := rp.cacheMgr.CacheResponse(ctx, prc, stopCh) if err != nil && err != io.EOF && err != context.Canceled { klog.Errorf("%s response cache ended with error, %v", util.ReqString(req), err) } }(ctx, prc, rp.stopCh) resp.Body = rc }
OpenYurt 於 2020 年 9 月進入 CNCF 沙箱後,持續保持了快速發展和迭代,在社區同窗一塊兒努力下,目前已經開源的能力有:
同時在和社區同窗的充分討論下,OpenYurt 社區也發佈了2021 roadmap,歡迎有興趣的同窗來一塊兒貢獻。
若是你們對 OpenYurt 感興趣,歡迎掃碼加入咱們的社區交流羣,以及訪問 OpenYurt 官網和 GitHub 項目地址: