快手基于 Flink 的持续优化与实践
本文由快手實時計算負責人董亭亭分享,主要介紹快手基于 Flink 的持續優化與實踐的介紹。內容包括:
一、Flink 穩定性持續優化
第一部分是 Flink 穩定性的持續優化。該部分包括兩個方面,第一個方面,主要介紹快手在 Flink Kafka Connector 方面做的一些高可用,是基于內部的雙機房讀或雙機房寫和一些容錯的策略。第二部分關于 Flink 任務的故障恢復。我們在加速故障恢復方面做了一些優化工作。
首先,介紹 Source 方面的高可用。在公司內部比較重要的數據寫 Kafka 時,Kafka 層面為保障高可用一般都會創建雙集群的 topic。雙集群的 topic 共同承擔全部流量,如果單集群發生故障,上游自動分流。Kafka 層面通過這種方式做到雙集群的高可用。但是 Flink 任務在消費雙集群 topic 時,本身是不能做到高可用的。Flink 任務通過兩個 Source union 方式消費,Source 分別感知上游 topic 故障,單集群故障需手動將故障 Source 摘除。這種方式的缺點是故障時需要人工的干預,必須手動去修改代碼的邏輯,程序內部本身是不能做到高可用的。這是做雙機房讀的背景。
為了解決上述問題,我們封裝了一個 Kafka 的 Cluster Source,它在 API 上支持讀取雙集群的 topic。同時做到,可以容忍單集群故障,集群故障恢復時也可以自動將故障集群重新加入。
接下來是關于 Sink 方面的高可用。Flink 寫雙集群 Kafka topic,會定義不同集群 Sink,邏輯內控制拆流。這種方式靈活性差,且不能容忍單機房故障。如果單集群發生故障,仍需要手動摘除對應的 Sink。
同樣,針對 sink 我們也定制了一個 Cluster Sink,它 API 上支持寫雙集群 topic。具體寫的策略,可以支持輪詢和主從寫的方式。如果單集群發生故障,邏輯內會自動將流量切到正常集群 topic。如果單集群故障恢復之后,也能感知到集群的恢復,可以自動的再把相應集群恢復回來。
另外,基于 Kafka 的 connector,我們也做了一些容錯的策略,這里提到三點。
- 第一點就是 Kafka Sink 容忍丟失。該問題的背景是,如果 Kafka 服務異常引發任務失敗,并且業務可以容忍少量數據丟失,但是不期望任務掛掉的情況。針對該問題,我們的優化是,設置 Kafka Sink 容忍 M 時間內 X% 丟失。具體實現上,Sink 單 task 統計失敗頻率,失敗頻率超過閾值任務才失敗。
- 第二點是 Kafka Source 一鍵丟 lag。該問題背景是, 一旦任務 lag 較長時間,未及時發現,或者任務 debug 環節,需要丟掉歷史驗證。之前只能靠重啟任務來丟棄 lag,任務重啟代碼比較好,耗時長。我們優化后,可以熱更新、無需重啟任務即可以丟棄 lag。實現邏輯是動態發操作命令給 source,source 收到命令后 seek 到最新位置。
- 第三點是 Kafka broker 列表動態獲取。該問題背景是, 生產環境中 Kafka broker 機器可能會故障下線,一旦請求到下線機器,會發生獲取 metadata 超時,任務頻繁失敗。我們優化后,Source task 啟動,可以獲取集群信息,動態重新獲取 Kafka brokerlist,避免頻繁重啟。
第二部分是 Flink 任務的故障恢復優化,分為兩個過程。一個是故障發現,另外一個是故障恢復。實際的生產環境中,一些不穩定的因素會導致故障恢復的時間特別的長,用戶的感知會比較差。同時,內部也有一些比較高優的任務,它對穩定性的要求比較高。我們希望做一些事情,把整個故障恢復的時間盡可能縮短。我們定了一個優化目標,20 秒內做到一個自動的恢復。
在故障發現階段的優化包括三點:
- 第一,內部自研 Hawk 系統,5s 發現宕機。
- 第二,Yarn 整合 Hawk,快速感知宕機。
- 第三,Flink 感知宕機 container release。
在故障恢復階段的優化包括:
- 第一,允許冗余部分 Container。
- 第二,適當調整 cancel task timeout 時間。
- 第三,針對適合任務開啟 Region Failover。
二、Flink 任務啟動優化
第二部分是任務啟動優化,Flink 任務啟動的時候,一般會涉及到比較多的角色,還有多個實例。如下圖所示,它的啟動在客戶端包括,初始化 Client,構建 jobGraph,上傳 Flink lib、job jar,申請 AM。在 Job Master,AM 啟動后、初始化,構建 ExectutionGraph,申請、啟動 Container,Job Task 調度。在 Task Manager 端, 容器申請到之后,啟動下載 jar 包資源,再去初始化 Task Manager 服務,然后收到 task 后才會去做部署。我們發現,線上啟動一個任務的時候,基本上在分鐘級別,耗時比較長。如果有一些任務需要升級,比如說,改了一些簡單的邏輯,需要將原來的任務停掉,然后再去重新啟動一個新的任務,這種場景可能就會更慢。因此,我在任務啟動的時候做一些優化,盡可能縮短任務啟動的時間,業務的斷流時間也進一步縮短。
在 Flink 新任務啟動優化方面,我們發現 IO 交互會比較耗時。在客戶端的 IO 包括,Flink 引擎 lib 包上傳 HDFS,用戶上傳 jar 包上傳 HDFS。在 JobMaster 包括, Container 下載啟動資源,TaskManager conf 上傳 HDFS。在 TaskManager 包括, Container 下載啟動資源,Conf 文件下載。
因此,想盡量的減少這樣的一些 lO 的操作。針對 Flink 引擎 lib 包,設置 Public 權限,App 之間共享。對于用戶 jar 包,提供工具,提前預發布到集群機器。對于 Conf 文件,通過環境變量傳遞。針對 JobMaster 啟動 TM 頻繁文件判斷,增加 cache 緩存。
以上是針對一個新任務啟動場景,下面介紹任務升級的場景。以前是同步升級,比如說,任務 A 在運行著,然后我要把任務 A 停掉,再去啟動新的任務 B。如下圖所示,不可用時間包括停任務 A 和啟動新任務 B。是否可以不用等任務 A 完全停掉之后,再啟動任務 B。針對這個想法我們做了一個異步升級的策略。新任務提前啟動,初始化到 JobMaster 階段。舊任務停掉后,完成新任務后續啟動工作,這樣新舊任務無縫切換。通過內部提交平臺將該步驟串聯起來,目標是異步升級在 20s 以內完成。
三、Flink SQL 實踐與優化
第三部分會介紹一下我們在使用 Flink SQL 的一些實踐和優化。首先介紹一下 Flink SQL 在快手的現狀。目前,我們內部 Flink SQL 的任務占比在 30% 左右。Flink SQL 的任務個數是 360 多個。然后它的峰值處理的條目數還是比較高的,大約是 4億每秒。在我們內部的一些重要活動的實時大屏的場景下,目前 Flink SQL 也作為一條鏈路,參與了相關指標的計算。
接下來介紹一下我們在使用 Flink SQL 的時候遇到的一些問題,以及我們做的一些優化。首先,關于 Flink SQL 的傾斜問題,在 UnBounded Agg 場景下的傾斜問題,已經有比較全面的思路去解決,總結為三點。
- 第一,MiniBatch Aggregation,思路是內存緩存 batch 數據再進行聚合,減少狀態訪問次數。
- 第二,Local Global Aggregation,思路是聚合操作拆分為兩階段, Local 階段預聚合減少數據條數,Global 解決全局聚合。
- 第三,Split Distinct Aggregation,思路是針對 count distinct 場景, 對分組 key 先分桶預聚合, 再對分桶結果全局聚合。
所以我們解決的第一個問題就是 Bounded Agg 的傾斜問題。如下圖所示,拿左邊的 SQL 作為例子,group by一個user,假定一天的窗口,然后去 select 每一個用戶總的交易額。右邊的圖,假定有一些用戶的交易特別多,就會造成某一些 Window Agg 的數據量特別大。
解決思路分為兩點。
- 第一,兩階段聚合,分為 Local window Agg 和 Global window Agg。Local window Agg:預聚合 window 大小與 global 階段保持一致,checkpoint 時將結果寫出,不保存狀態 。Global window Agg:全量聚合。
- 第二,增加 mini-batch,好處是 local 階段 mini-batch 避免數據量緩存過多,Global 階段 mini-batch 減少狀態訪問次數。
我們解決的第二個問題是 Flink SQL 下的 UDF 函數復用的問題。如下圖所示,以左邊的 SQL 為例,可以看到有兩個 UDF 的函數,這兩個函數在 SQL 里面都重復出現了多次。
- 優化前:相同 UDF 多次執行,性能變差。
- 優化后:同一條數據下 UDF 結果復用,避免多次調用執行,節約資源,性能也得到提升。拿示例 SQL 來說,性能提升了 2 倍。
四、未來工作
第四部分介紹我們未來的一些規劃,分為三塊。
- 第一,關于資源利用率。目標是提升集群整體資源利用均衡性,Flink 任務內調度均衡性,以及 Flink 任務資源使用合理性。
- 第二,關于 Flink SQL。我們會持續的去做推廣。我們希望提升 SQL 任務穩定性和 SQL 任務資源的利用率。
- 第三,探索流批統一,這也是業界的一個方向。我們希望可以一套代碼就解決問題,不用重復開發兩套任務。
原文鏈接:https://developer.aliyun.com/article/782330?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的快手基于 Flink 的持续优化与实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云AIoT正式发布IoT安全中心和I
- 下一篇: 小麦助教:通过阿里云原生中间件产品组合,