【OpenYurt 深度解析】边缘网关缓存能力的优雅实现
作者 |?何淋波(新勝)
來源 | 阿里巴巴云原生公眾號
OpenYurt:延伸原生 K8s 的能力到邊緣
阿里云邊緣容器服務(wù)上線 1 年后,正式開源了云原生邊緣計算解決方案 OpenYurt,跟其他開源的容器化邊緣計算方案不同的地方在于:OpenYurt 秉持 Extending your native Kubernetes to edge 的理念,對 Kubernetes 系統(tǒng)零修改,并提供一鍵式轉(zhuǎn)換原生 Kubernetes 為 OpenYurt,讓原生 K8s 集群具備邊緣集群能力。
同時隨著 OpenYurt 的持續(xù)演進,也一定會繼續(xù)保持如下發(fā)展理念:
- 非侵入式增強 K8s
- 保持和云原生社區(qū)主流技術(shù)同步演進
OpenYurt 如何解決邊緣自治問題
想要實現(xiàn)將 Kubernetes 系統(tǒng)延展到邊緣計算場景,那么邊緣節(jié)點將通過公網(wǎng)和云端連接,網(wǎng)絡(luò)連接有很大不可控因素,可能帶來邊緣業(yè)務(wù)運行的不穩(wěn)定因素,這是云原生和邊緣計算融合的主要難點之一。
解決這個問題,需要使邊緣側(cè)具有自治能力,即當云邊網(wǎng)絡(luò)斷開或者連接不穩(wěn)定時,確保邊緣業(yè)務(wù)可以持續(xù)運行。在 OpenYurt 中,該能力由 yurt-controller-manager 和 YurtHub 組件提供。
1. YurtHub 架構(gòu)
在之前的文章中,我們詳細介紹了 YurtHub 組件的能力。其架構(gòu)圖如下:
圖片鏈接
YurtHub 是一個帶有數(shù)據(jù)緩存功能的“透明網(wǎng)關(guān)”,和云端網(wǎng)絡(luò)斷連狀態(tài)下,如果節(jié)點或者組件重啟,各個組件(kubelet/kube-proxy 等)將從 YurtHub 中獲取到業(yè)務(wù)容器相關(guān)數(shù)據(jù),有效解決邊緣自治的問題。這也意味著我們需要實現(xiàn)一個輕量的帶數(shù)據(jù)緩存能力的反向代理。
2. 第一想法
實現(xiàn)一個緩存數(shù)據(jù)的反向代理,第一想法就是從 response.Body 中讀取數(shù)據(jù),然后分別返回給請求 client 和本地的 Cache 模塊。偽代碼如下:
func HandleResponse(rw http.ResponseWriter, resp *http.Response) {bodyBytes, _ := ioutil.ReadAll(resp.Body)go func() {// cache response on local diskcacher.Write(bodyBytes)}// client reads data from responserw.Write(bodyBytes) }當深入思考后,在 Kubernetes 系統(tǒng)中,上述實現(xiàn)會引發(fā)下面的問題:
-
問題 1:流式數(shù)據(jù)需要如何處理(如: K8s 中的 watch 請求),意味 ioutil.ReadAll() 一次調(diào)用無法返回所有數(shù)據(jù)。即如何可以返回流數(shù)據(jù)同時又緩存流數(shù)據(jù)。
-
問題 2:同時在本地緩存數(shù)據(jù)前,有可能需要對傳入的 byte slice 數(shù)據(jù)先進行清洗處理。這意味著需要修改 byte slice,或者先備份 byte slice 再處理。這樣會造成內(nèi)存的大量消耗,同時針對流式數(shù)據(jù),到底申請多大的 slice 也不好處理。
3. 優(yōu)雅實現(xiàn)探討
針對上面的問題,我們將問題逐個抽象,可以發(fā)現(xiàn)更優(yōu)雅的實現(xiàn)方法。
- 問題 1:如何對流數(shù)據(jù)同時進行讀寫
針對流式數(shù)據(jù)的讀寫(一邊返回一邊緩存),如下圖所示,其實需要的不過是把 response.Body(io.Reader) 轉(zhuǎn)換成一個 io.Reader 和一個 io.Writer。或者說是一個 io.Reader 和 io.Writer 合成一個 io.Reader。這很容易就聯(lián)想到 Linux 里面的 Tee 命令。
而在 Golang 中 Tee 命令是實現(xiàn)就是io.TeeReader,那問題 1 的偽代碼如下:
func HandleResponse(rw http.ResponseWriter, resp *http.Response) {// create TeeReader with response.Body and cachernewRespBody := io.TeeReader(resp.Body, cacher)// client reads data from responseio.Copy(rw, newRespBody) }通過 TeeReader 的對 Response.Body 和 Cacher 的整合,當請求 client 端從 response.Body 中讀取數(shù)據(jù)時,將同時向 Cache 中寫入返回數(shù)據(jù),優(yōu)雅的解決了流式數(shù)據(jù)的處理。
- 問題 2:如何在緩存前先清洗流數(shù)據(jù)
如下圖所示,緩存前先清洗流數(shù)據(jù),請求端和過濾端需要同時讀取 response.Body(2 次讀取問題)。也就是需要將 response.Body(io.Reader) 轉(zhuǎn)換成兩個 io.Reader。
也意味著問題 2 轉(zhuǎn)化成:問題 1 中緩存端的 io.Writer 轉(zhuǎn)換成 Data Filter 的 io.Reader。其實在 Linux 命令中也能找到類似命令,就是管道。因此問題 2 的偽代碼如下:
func HandleResponse(rw http.ResponseWriter, resp *http.Response) {pr, pw := io.Pipe()// create TeeReader with response.Body and Pipe writernewRespBody := io.TeeReader(resp.Body, pw)go func() {// filter reads data from response io.Copy(dataFilter, pr)}// client reads data from responseio.Copy(rw, newRespBody) }通過 io.TeeReader 和 io.PiPe,當請求 client 端從 response.Body 中讀取數(shù)據(jù)時,Filter 將同時從 Response 讀取到數(shù)據(jù),優(yōu)雅的解決了流式數(shù)據(jù)的 2 次讀取問題。
YurtHub 實現(xiàn)
最后看一下 YurtHub 中相關(guān)實現(xiàn),由于 Response.Body 為 io.ReadCloser,所以實現(xiàn)了 dualReadCloser。同時 YurtHub 可能也面臨對 http.Request 的緩存,所以增加了 isRespBody 參數(shù)用于判定是否需要負責關(guān)閉 response.Body。
// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/util/util.go#L156 // NewDualReadCloser create an dualReadCloser object func NewDualReadCloser(rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser) {pr, pw := io.Pipe()dr := &dualReadCloser{rc: rc,pw: pw,isRespBody: isRespBody,}return dr, pr }type dualReadCloser struct {rc io.ReadCloserpw *io.PipeWriter// isRespBody shows rc(is.ReadCloser) is a response.Body// or not(maybe a request.Body). if it is true(it's a response.Body),// we should close the response body in Close func, else not,// it(request body) will be closed by http request callerisRespBody bool }// Read read data into p and write into pipe func (dr *dualReadCloser) Read(p []byte) (n int, err error) {n, err = dr.rc.Read(p)if n > 0 {if n, err := dr.pw.Write(p[:n]); err != nil {klog.Errorf("dualReader: failed to write %v", err)return n, err}}return }// Close close two readers func (dr *dualReadCloser) Close() error {errs := make([]error, 0)if dr.isRespBody {if err := dr.rc.Close(); err != nil {errs = append(errs, err)}}if err := dr.pw.Close(); err != nil {errs = append(errs, err)}if len(errs) != 0 {return fmt.Errorf("failed to close dualReader, %v", errs)}return nil }在使用 dualReadCloser 時,可以在httputil.NewSingleHostReverseProxy的modifyResponse()方法中看到。代碼如下:
// https://github.com/openyurtio/openyurt/blob/master/pkg/yurthub/proxy/remote/remote.go#L85 func (rp *RemoteProxy) modifyResponse(resp *http.Response) error {rambohe-ch, 10 months ago: ? hello openyurt// 省略部分前置檢查 rc, prc := util.NewDualReadCloser(resp.Body, true)go func(ctx context.Context, prc io.ReadCloser, stopCh <-chan struct{}) {err := rp.cacheMgr.CacheResponse(ctx, prc, stopCh)if err != nil && err != io.EOF && err != context.Canceled {klog.Errorf("%s response cache ended with error, %v", util.ReqString(req), err)}}(ctx, prc, rp.stopCh)resp.Body = rc }總結(jié)
OpenYurt 于 2020 年 9 月進入 CNCF 沙箱后,持續(xù)保持了快速發(fā)展和迭代,在社區(qū)同學(xué)一起努力下,目前已經(jīng)開源的能力有:
- 邊緣自治
- 邊緣單元化管理
- 云邊協(xié)同運維
- 一鍵式無縫轉(zhuǎn)換能力
同時在和社區(qū)同學(xué)的充分討論下,OpenYurt 社區(qū)也發(fā)布了2021 roadmap,歡迎有興趣的同學(xué)來一起貢獻。
如果大家對 OpenYurt 感興趣,歡迎掃碼加入我們的社區(qū)交流群,以及訪問 OpenYurt 官網(wǎng)和 GitHub 項目地址:
- OpenYurt 官網(wǎng):https://openyurt.io
- GitHub 項目地址:https://github.com/openyurtio/openyurt
- 歡迎釘釘搜索群號:31993519,加入社區(qū)交流群!
總結(jié)
以上是生活随笔為你收集整理的【OpenYurt 深度解析】边缘网关缓存能力的优雅实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面对不可避免的故障,我们造了一个“上帝视
- 下一篇: 阿里巴巴研究员叔同:云原生是企业数字创新