腾讯基于 Flink 的实时流计算平台演进之路
原文地址:https://www.infoq.cn/article/TjDeQDJQpKZ*NpG71pRW
大家好,我是來自騰訊大數(shù)據團隊的楊華(vinoyang),很高興能夠參加這次北京的 QCon,有機會跟大家分享一下騰訊實時流計算平臺的演進與這個過程中我們的一些實踐經驗。
這次分享主要包含四個議題,我會首先闡述一下騰訊在實時計算中使用 Flink 的歷程,然后會簡單介紹一下騰訊圍繞 Flink 的產品化實踐:我們打造了一個 Oceanus 平臺,同時騰訊云也早已提供基于 Flink 的實時流計算服務,接著我們會重點跟大家聊一聊我們對社區(qū)版 Flink 的一些擴展與改進、優(yōu)化。
Flink 在騰訊實時計算概況簡介
首先,我們進入第一個議題。Flink 在騰訊正式被考慮替代 Storm 是在 2017 年。
17 年上半年,我們主要在調研 Flink 替換 Storm 的可行性、特性、性能是否能夠滿足我們的上線要求。在此之前,我們內部以 Storm 作為實時計算的基礎框架也已經有幾年的時間了,在使用的過程中也發(fā)現(xiàn)了 Storm 的一些痛點,比如,沒有內置狀態(tài)的支持,沒有提供完備的容錯能力,沒有內置的窗口 API,core API 無法提供 Exactly-once 的語義保證等等。
17 年下半年,我們從社區(qū)拉出當時最新的發(fā)布分支(1.3.2)作為我們內部的定制開發(fā)分支進行開發(fā)。作為一個試點,我們選擇了內部一個流量較大的業(yè)務來進行替換,這個業(yè)務在我們內部是以 standalone 的模式部署的,所以我們最初也使用的是 Flink 的 standalone 部署模式。
18 年上半年,我們開始圍繞 Flink 進行產品化,打造了一個全流程、一體化的實時流計算平臺——Oceanus,來簡化業(yè)務方構建實時應用的復雜度并降低運維成本,這也基本明確了后續(xù)我們主要的運行模式是 Flink on YARN。
18 年下半年,我們的 Oceanus 平臺已經有足夠的能力來構建常見的流計算應用,我們部門內部的一些實時流計算業(yè)務也已經在平臺上穩(wěn)定運行,于是我們開始為騰訊云、騰訊其他事業(yè)群以及業(yè)務線提供流計算服務。同時,我們也將平臺整合進我們的大數(shù)據套件,為外部私有云客戶提供流計算服務。
19 年上半年,我們的主要目標是在 Oceanus 上沉淀并完善上層的場景化服務建設,比如提供在線機器學習、風控等場景化服務。另外,我們也在 Flink 批處理方向發(fā)力,利用 Flink 的計算能力來滿足跨數(shù)據中心,跨數(shù)據源的聯(lián)合分析需求。它可以做到:數(shù)據源 SQL 下推,避免集群帶寬資源浪費;單 DC 內 CBO(基于代價優(yōu)化),生成最優(yōu)的執(zhí)行計劃;跨 DC CBO,根據 DC 負載和資源選擇最佳 DC 執(zhí)行計算,從而獲得更好的資源利用和更快的查詢性能。以上就是騰訊使用 Flink 的整個歷程。
這幅圖展示了,Flink 目前在騰訊內部已經為一些我們耳熟能詳?shù)漠a品提供實時計算的服務。這些產品,包括微信、支付、財付通、騰訊云、QQ、空間、音樂、游戲、K 歌等等。我們列舉其中幾個業(yè)務的使用案例,微信使用我們的平臺來統(tǒng)計朋友圈的實時瀏覽信息、小游戲種子用戶的 UV 計算、實時惡意流量分析判斷、看一看的紅點信息;支付用來計算商戶交易相關的統(tǒng)計;音樂用于實時點唱、熱門排行榜等等。
接下來我們來了解一下,目前 Flink 在騰訊使用的現(xiàn)狀。目前我們 Oceanus 平臺 YARN 集群的 vcore 總數(shù)目達到了 34 萬,累計的峰值計算能力接近 2.1 億 / 秒,日均處理消息規(guī)模近 20 萬億。到目前為止,騰訊內部除了廣告的在線訓練業(yè)務外,原先運行在 Storm 上的實時流計算業(yè)務都已逐步遷移到 Flink 引擎上,而廣告這塊的業(yè)務預計也會在今年下半年遷移完成。
Oceanus 平臺簡介
接下來,我們進入第二個議題:簡要介紹一下我們的 Oceanus 平臺。
首先,我們來看一下 Oceanus 平臺的整體技術架構。我們內部定制版的 Flink 引擎,稱之為 TDFLINK,它跟其他的一些大數(shù)據基礎設施框架交互并協(xié)同支撐了我們上層的 Oceanus 平臺,Oceanus 支持畫布、SQL 以及 Jar 三種形式構建應用,為了方便業(yè)務方降低整體成本,我們還提供了配置、測試、部署等完整配套的功能,在平臺之上我們提供了一些領域特定的場景化服務比如 ETL、監(jiān)控、推薦廣告等。
下面我們來介紹 Oceanus 的幾個典型功能。首先這是某個用戶的應用列表頁。從列表中,我們可以看到應用的當前狀態(tài)、類型、迭代的版本,它歸屬于哪個場景等信息。在操作欄,我們可以一鍵對應用進行啟停、調試,查看它的指標信息等,除此之外我們還提供了很多便捷的操作,比如快速復制一個應用,他們都收納在“更多”菜單按鈕中。
這是我們的一個指標分鐘級統(tǒng)計的畫布應用詳情頁,我們?yōu)?ETL 類型的應用提供了一個通用的 transform 算子。它提供了很多功能細分的可插拔的便捷函數(shù)來簡化常見的事件解析與提取的復雜度。在圖中,多種不同類型的指標經過 split 算子分流后將相同的指標進行歸類,然后再對它們應用各自的統(tǒng)計邏輯,就像這里的窗口一樣,基本上每個算子都是配置化的。像這種類型的應用我們通過拖拽、配置就可以輕松完成它的構建。
這幅圖展示了我們的指標詳情頁檢查點的指標明細,為了讓業(yè)務人員更直觀地了解它們最關心的指標信息,我們將一些必要的指標進行了重新梳理并展示到我們的平臺上,這里有些指標直接使用了 Flink 提供的 REST API 接口,而有些指標則是我們內部擴展定制的。
最后,我們來介紹一下最近上線的在線機器學習模塊。這是我們一個模型訓練應用的詳情頁,同樣它也是畫布類型的,我們對常規(guī)的機器學習類型的應用進行了步驟拆分,包括了數(shù)據預處理、算法等相關步驟,每個步驟都可以進行拖拽,再加上配置的方式就可以完成一個 ML 類型的應用創(chuàng)建。
對于訓練得到的模型,我們也提供了模型服務功能,我們用模型服務組來管理每個模型的不同時間的版本,點擊右側的“評估報告”可以查看這個模型的 AUC 趨勢。
以上是對 Oceanus 平臺的介紹,如果大家有興趣可以掃描 PPT 最后的二維碼來進一步了解我們的平臺以及騰訊云上的流計算服務。
針對 Flink 的擴展與優(yōu)化
接下來,我們進入下一個議題,介紹我們內部 Flink 版本在通過騰訊云對外提供服務時基于內部以及業(yè)務的相關需求對社區(qū)版的擴展與優(yōu)化。
第一個改進是我們重構了 Flink Web UI,我們重構的原因是因為社區(qū)版的 Flink Web UI 在定位問題的時候不能提供足夠的信息,導致問題定位的效率不夠高。尤其是 Job 并行度非常大,YARN 的 container 數(shù)目非常多的時候,當 Job 發(fā)生失敗,很難快速去找到 container 和節(jié)點以查看進程的堆棧或者機器指標。所以,為了更高效地定位問題,我們對 Flink web UI 進行了重構并暴露了一些關鍵指標。
這是我們一個 TaskManager 的詳情頁,我們?yōu)樗略隽艘粋€“Threads” Tab,我們可以通過它看到 Task 相關的線程信息:線程名稱、CPU 消耗、狀態(tài)以及堆棧等。這樣一旦哪個算子的線程可能成為瓶頸時,我們可以快速定位到它阻塞在什么方法調用上。
接下來的這個改進是對 JobManager failover 的優(yōu)化。大家應該都知道社區(qū)版的 Flink JobManager HA 在 Standalone 模式下有個很大的問題是:它的 standby 節(jié)點是冷備的,JobManager 的切換會導致它管理的所有 Job 都會被重啟恢復,這一行為在我們現(xiàn)網環(huán)境中是不可接受的。所以,我們首先定制的第一個大特性就是 JobManager 的 failover 優(yōu)化,讓 standby 節(jié)點變成熱備,這使得 JobManager 的切換對 TaskManager 上已經正在運行的 Job 不產生影響。我們已經對 Standalone 以及 Flink on YARN 這兩種部署模式支持了這個特性,Flink on YARN 的支持還處于內部驗證階段。我們以對 Standalone 模式的優(yōu)化為例來進行分析,它主要包含這么幾個步驟:
-
取消 JobManager 跟 TaskManager 因為心跳超時或 Leadership 變動就 cancel task 的行為;
-
對 ExecutionGraph 核心數(shù)據的快照;
-
通過 ExecutionGraphBuilder 重構空的 ExecutionGraph 加上快照重置來恢復出一個跟原先等價的 ExecutionGraph 對象;
-
TaskManager 跟新的 JobManager leader 建立連接后以心跳上報自己的狀態(tài)和必要的信息;
-
新的 JobManager 確認在 reconcile 階段 Job 的所有 task 是否正常運行;
接下來的這個改進已經在反饋社區(qū)的過程中,它就是對檢查點失敗處理的改進。在探討改進之前,我們先來了解一下社區(qū)版當前的處理機制。JobMaster 中,每個 Job 會對應一個 Checkpoint Coordinator,它用來管理并協(xié)調 Job 檢查點的執(zhí)行。當?shù)竭_一個檢查點的觸發(fā)周期,Coordinator 會對所有的 Source Task 下發(fā) TriggerCheckpoint 消息,source task 會在自身完成快照后向下游廣播 CheckpointBarrier,作為下游 task 觸發(fā)的通知。其中,如果一個 task 在執(zhí)行檢查點時失敗了,這取決于用戶是否容忍這個失敗(通過一個配置項),如果選擇不容忍那么這個失敗將變成一個異常導致 task 的失敗,與此同時 task 的失敗將會通知到 JobMaster,JobMaster 將會通知這個 Job 的其他 task 取消它們的執(zhí)行。現(xiàn)有的機制存在一些問題:
-
Coordinator 并不能控制 Job 是否容忍檢查點失敗,因為控制權在 task 端;
-
Coordinator 當前的失敗處理代碼邏輯混亂,區(qū)分出了觸發(fā)階段,卻忽略了執(zhí)行階段;
-
無法實現(xiàn)容忍多少個連續(xù)的檢查點失敗則讓 Job 失敗的邏輯。
了解了現(xiàn)有的實現(xiàn)機制,我們再來看接下來的改進方案。首先,我們對源碼中 checkpoint package 下的相關類進行了重構,使得它不再區(qū)分觸發(fā)階段,引進了更多的檢查點失敗原因的枚舉并重構了相關的代碼。然后我們引入了 CheckpointFailureManager 組件,用來統(tǒng)一失敗管理,同時為了適配更靈活的容忍失敗的能力,我們引入了檢查點失敗計數(shù)器機制。現(xiàn)在,當我們遇到檢查點失敗后,這個失敗信息會直接上報到 Coordinator,而是否要讓 Job 失敗具體的決策則由 CheckpointFailureManager 作出,這就使得 Coordinator 具有了完整的檢查點控制權,而決策權轉讓給 CheckpointFailureManager,則充分實現(xiàn)了邏輯解耦。
下面我們要看的這個特性是對 Flink 原生窗口的增強,所以我們稱之為 Enhanced window。大家都知道 Flink 的 EventTime 語義的窗口無法保證任意延遲到達的數(shù)據都能參與窗口計算,它只允許你設置一個容忍延遲的時間。但我們的應用場景里,數(shù)據的延遲可能非常高,甚至有時跨天的也會發(fā)生,但我們無法為常規(guī)的窗口設置這么長的延遲時間,并且我們的業(yè)務無法容忍延遲數(shù)據被丟棄的行為。因此針對這種場景,Flink 自帶的窗口無法滿足我們的需求。所以,我們對它做了一些改進,它能夠容忍任意延遲到來的事件,所有的事件都不會被丟棄,而是會加入一個新的窗口重新計算,新窗口跟老窗口毫無關系,所以最終可能針對一個窗口在用戶的目標表中會存在多條記錄,用戶只需自行聚合即可。
為了方便在上層使用這種窗口,我們?yōu)樗ㄖ屏?SQL 關鍵字,這幅圖展示了我們在指標統(tǒng)計場景中使用它的一個示例。
這是我們根據業(yè)務需求所定制的另一個窗口——增量窗口。在業(yè)務中經常遇到這樣的需求:希望看到一個窗口周期內的增量變化,這個窗口周期可能會很長,比如一個天級別的窗口。比如我們希望看到一天內每個小時的 PV 增長趨勢,或者游戲中的一些虛擬物品的消耗趨勢。Flink 默認的翻滾窗口以及觸發(fā)器是沒有內置這種窗口內小批次觸發(fā)的功能。當然我們也可以通過一個個的小窗口來計算階段性的結果,然后再對數(shù)據進行二次處理,但這樣會比較麻煩。所以我們實現(xiàn)了大窗口內多次增量觸發(fā)的功能,擴展實現(xiàn)了一個窗口內多次觸發(fā)的 Trigger,并定制了相應的 SQL 語法來供業(yè)務使用。這里我們可以看到雖然是大窗口,但由于數(shù)據都在不斷地進行增量聚合,所以并不會 hold 住非常大的狀態(tài)集。
這幅圖展示了增量窗口的使用方式,通過新的關鍵字,底層會映射到我們自實現(xiàn)的觸發(fā)器。
接下來我們要看的這個特性是我們對 Flink keyBy 的優(yōu)化,我們稱之為:LocalKeyBy。我們在使用 KeyBy 的時候都遇到過數(shù)據熱點的問題,也就是數(shù)據傾斜。數(shù)據傾斜主要是業(yè)務數(shù)據的 key 取值不夠離散,而 keyBy 背后是 hash 的 partition 方式,它根據一個 key 的 hash 值來決定數(shù)據要落到哪個節(jié)點分區(qū)。如果發(fā)生數(shù)據傾斜很容易造成計算資源利用不均以及反壓(back pressure)等問題產生。針對這一點,我們在保證計算語義的情況下對 keyBy 進行了優(yōu)化,開發(fā)了 LocalKeyBy 功能。它的原理是通過本地預聚合來減少發(fā)送的數(shù)據量,但這里需要注意的一點是:使用這個算子的時候,需要對原有的實現(xiàn)代碼進行調整,因為它將原來的 keyBy 拆分為了兩步:預聚合以及合并。
我們在本地對 keyBy 與 LocalKeyBy 做了一個簡單的性能對比測試,發(fā)現(xiàn)在流量傾斜嚴重的情況下,使用 LocalKeyBy 整體性能并沒有受到太大的影響,但 Flink 原生的 keyBy 則隨著流量的傾斜而產生顯著的性能下降。
我們繼續(xù)來看一個特性:水位線算子定時檢測流分區(qū)空閑的功能。Flink 社區(qū)目前針對 Source 實現(xiàn)了定時的流 idle 檢測功能(雖然沒有開放),它主要針對的場景是 Kafka 某個分區(qū)空閑無數(shù)據從而造成對應的 subtask 無法正常提取 watermark,導致對下游的計算產生影響。
但我們的場景和社區(qū)略有差別,我們沒有將所有的邏輯都壓到 source 里,為了進行邏輯拆分我們引入了一個 transform 算子,它專門針對 ETL 的場景,所以我們的 watermark 很多情況下不在 source 算子上提取,而是在下游的某個算子上,在某些情況下,如果 watermark 的分配算子在 filter 之類的算子后面,則可能造成某個 pipeline 在中間斷流,也造成了無法正常提取 watermark 的情形。針對這種場景,我們在提取 watermark 的算子上也實現(xiàn)了定時檢測流 idle 的功能。這樣就算因為某個分區(qū)的數(shù)據都被過濾掉造成空閑,也不至于對下游的計算產生影響。
我們介紹的下一個特性是 Framework 與用戶業(yè)務日志的分離。這個特性其實最受益的是 Standalone 部署模式,因為這種模式下多 job 的 task 是混合部署在同一個 TaskManager 中的,而 TaskManager 本身只使用一個日志文件來記錄日志。所以,這導致排查業(yè)務問題非常麻煩。另外,我們對 Flink web UI 展示日志文件也做了一些改進,我們會列出 JobManager 以及 TaskManager 的日志文件夾中所有的文件列表。這是因為,隨著流應用長時間運行,累積的日志量會越來越大,我們通常都會對應用的日志配置滾動策略,除此之外我們還會輸出 GC 日志等,而 Flink 的 web UI 默認只能展示最新的那個日志文件,這對于我們定位問題很不方便。所以,我們引入了一個新的 tab,它能夠列出日志文件夾下的文件列表,點擊后再請求特定的日志文件。
在分析這個特性的實現(xiàn)之前,我們需要先了解 Flink 目前加載日志框架類的方式,它為了避免跟業(yè)務 Job 中可能包含的日志框架的依賴、配置文件產生沖突,日志相關類的加載都代理給平臺的類加載器,也就是 TaskManager 的類加載器,而 TaskManager 本身加載的這些類都是從 Flink 安裝包的 lib 底下加載的。而關于日志配置文件,Flink 通過 JVM 啟動參數(shù)來指定配置文件路徑以及日志文件路徑。這些機制共同保證了 Flink 不會受到用戶 job jar 的干擾。
所以,如果我們要實現(xiàn)日志分離,我們就需要打破 Flink 原先的實現(xiàn)機制,關鍵點在于:為不同 Job 的 Task 加載不同的日志類;為不同 Job 的 Task 指定不同的配置文件以及用戶日志文件的路徑。這意味著我們需要定制 Flink 自帶的 user classloader。針對第一點,我們不再將這些日志類的加載代理給平臺的加載器,而是將平臺類加載器中日志相關的 jar 的 classpath 加入到各個 task 自己的 classloader 中。關于配置文件,我們顯然也不能用 Flink 平臺的配置文件。我們會拿平臺使用的配置文件作為模板,對其內部的日志路徑進行動態(tài)修改,然后將內存中的這個配置文件傳遞給特定的日志框架。那么這里就有一個問題,內存中的配置文件二進制數(shù)據怎么被日志框架讀取。log4j 以及 logback 都可以接收配置文件的 URL 表示,而 URL 也可以接收一個 URLStreamHandler 的實現(xiàn)(它是所有流協(xié)議的處理器用于連接二進制的數(shù)據流與 URL),通過效仿 bytebuddy(一個動態(tài)修改 Java 二進制字節(jié)碼的類庫),我們實現(xiàn)了 ByteArrayUrlStreamHandler 來進行二進制的配置文件跟 URL 之間的銜接,這兩點完成后不同 Job 的 Task 的類加載器就保證了日志類加載和配置的完全獨立性。
目前,我們內部所定制優(yōu)化的一些特性有些已逐步反饋給社區(qū),還有一些比較大的改動也在跟社區(qū)商討合并計劃。我們歡迎有志于迎接萬億級數(shù)據規(guī)模挑戰(zhàn)以及參與 Flink 引擎研發(fā)的同學加入我們。
總結
以上就是所有的分享內容,由于表達與時間關系,可能很多點不能很好地闡述清楚,如果大家有想法我們可以再線下交流。也歡迎大家掃碼了解騰訊的大數(shù)據產品、騰訊云上的流計算服務以及 Oceanus 的功能。
轉載于:https://www.cnblogs.com/davidwang456/articles/10953252.html
總結
以上是生活随笔為你收集整理的腾讯基于 Flink 的实时流计算平台演进之路的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: “分库分表 ?选型和流程要慎重,否则会失
- 下一篇: MongoDB如何一次插入多条json数