Apache Hudi 在 B 站构建实时数据湖的实践
簡介:?B 站選擇 Flink + Hudi 的數據湖技術方案,以及針對其做出的優化。
本文作者喻兆靖,介紹了為什么 B 站選擇 Flink + Hudi 的數據湖技術方案,以及針對其做出的優化。主要內容為:
一、傳統離線數倉痛點
1. 痛點
之前 B 站數倉的入倉流程大致如下所示:
在這種架構下產生了以下幾個核心痛點:
總結一下就是:
- 調度啟動晚;
- 合并速度慢;
- 重復讀取多。
2. 痛點思考
- 調度啟動晚
思路:既然 Flink 落 ODS 是準實時寫入的,有明確的文件增量概念,可以使用基于文件的增量同 步,將清洗、補維、分流等邏輯通過增量的方式進行處理,這樣就可以在 ODS 分區未歸檔的時 候就處理數據,理論上數據的延遲只取決于最后一批文件的處理時間。
- 合并速度慢
思路:既然讀取已經可以做到增量化了,那么合并也可以做到增量化,可以通過數據湖的能力結 合增量讀取完成合并的增量化。
- 重復讀取多
思路:重復讀取多的主要原因是分區的粒度太粗了,只能精確到小時/天級別。我們需要嘗試一 些更加細粒度的數據組織方案,將 Data Skipping 可以做到字段級別,這樣就可以進行高效的數 據查詢了。
3. 解決方案: Magneto - 基于 Hudi 的增量數據湖平臺
以下是基于 Magneto 構建的入倉流程:
-
Flow
- 使用流式 Flow 的方式,統一離線和實時的 ETL Pipline
-
Organizer
- 數據重組織,加速查詢
- 支持增量數據的 compaction
-
Engine
- 計算層使用 Flink,存儲層使用 Hudi
-
Metadata
- 提煉表計算 SQL 邏輯
- 標準化 Table Format 計算范式
二、數據湖技術方案
1. Iceberg 與 Hudi 的取舍
1.1 技術細節對比
1.2 社區活躍度對比
統計截止至 2021-08-09
1.3 總結
大致可以分為以下幾個主要緯度來進行對比:
- 對 Append 的支持
Iceberg 設計之初的主要支持方案,針對該場景做了很多優化。 Hudi 在 0.9 版本中對 Appned 模式進行了支持,目前在大部分場景下和 Iceberg 的差距不大, 目前的 0.10 版本中仍然在持續優化,與 Iceberg 的性能已經非常相近了。
- 對 Upsert 的支持
Hudi 設計之初的主要支持方案,相對于 Iceberg 的設計,性能和文件數量上有非常明顯的優 勢,并且 Compaction 流程和邏輯全部都是高度抽象的接口。 Iceberg 對于 Upsert 的支持啟動較晚,社區方案在性能、小文件等地方與 Hudi 還有比較明顯 的差距。
- 社區活躍度
Hudi 的社區相較于 Iceberg 社區明顯更加活躍,得益于社區活躍,Hudi 對于功能的豐富程度與 Iceberg 拉開了一定的差距。
綜合對比,我們選擇了 Hudi 作為我們的數據湖組件,并在其上繼續優化我們需要的功能 ( Flink 更好的集成、Clustering 支持等)
2. 選擇 Flink + Hudi 作為寫入方式
我們選擇 Flink + Hudi 的方式集成 Hudi 的主要原因有三個:
Spark + Hudi 的集成方案主要有兩種 Index 方案可供選擇,但是都有劣勢:
- Bloom Index:使用 Bloom Index 的話,Spark 會在寫入的時候,每個 task 都去 list 一遍所有的文件,讀取 footer 內寫入的 Bloom 過濾數據,這樣會對我們內部壓力已經非常大的 HDFS 造成非常恐怖的壓力。
- Hbase Index:這種方式倒是可以做到 O(1) 的找到索引,但是需要引入外部依賴,這樣會使整個方案變的比較重。
3. Flink + Hudi 集成的優化
3.1 Hudi 0.8 版本集成 Flink 方案
針對 Hudi 0.8 版本集成暴露出來的問題,B站和社區合作進行了優化與完善。
3.2 Bootstrap State 冷啟動
背景:支持在已經存在 Hudi 表啟動 Flink 任務寫入,從而可以做到由 Spark on Hudi 到 Flink on Hudi 的方案切換
原方案:
問題:每個 Task 處理全量數據,然后選擇屬于當前 Task 的 HoodieKey 存入 state 優化方案。
- 每個 Bootstrap Operator 在初始化時,加載屬于當前 Task 的 fileId 相關的 BaseFile 和 logFile;
- 將 BaseFile 和 logFile 中的 recordKey 組裝成 HoodieKey,通過 Key By 的形式發送給 BucketAssignFunction,然后將 HoodieKey 作為索引存儲在 BucketAssignFunction 的 state 中。
效果:通過將 Bootstrap 功能單獨抽出一個 Operator,做到了索引加載的可擴展性,加載速度提升 N (取決于并發度) 倍。
3.3 Checkpoint 一致性優化
背景:在 Hudi 0.8 版本的 StreamWriteFunction 中,存在極端情況下的數據一致性問題。
原方案:
問題:CheckpointComplete不在CK生命周期內,存在CK成功但是instant沒有commit的情 況,從而導致出現數據丟失。
優化方案:
3.4 Append 模式支持及優化
背景:Append 模式是用于支持不需要 update 的數據集時使用的模式,可以在流程中省略索引、 合并等不必要的處理,從而大幅提高寫入效率。
主要修改:
- 支持每次 FlushBucket 寫入一個新的文件,避免出現讀寫的放大;
- 添加參數,支持關閉 BoundedInMemeoryQueue 內部的限速機制,在 Flink Append 模式下只需要將 Queue 的大小和 Bucket buffer 設置成同樣的大小就可以了;
- 針對每個 CK 產生的小文件,制定自定義 Compaction 計劃;
- 通過以上的開發和優化之后,在純 Insert 場景下性能可達原先 COW 的 5 倍。
三、Hudi 任務穩定性保障
1. Hudi 集成 Flink Metrics
通過在關鍵節點上報 Metric,可以比較清晰的掌握整個任務的運行情況:
2. 系統內數據校驗
3. 系統外數據校驗
四、數據入湖實踐
1. CDC數據入湖
1.1 TiDB入湖方案
由于目前開源的各種方案都沒辦法直接支持 TiDB 的數據導出,直接使用 Select 的方式會影響數 據庫的穩定性,所以拆成了全量 + 增量的方式:
1.2 MySQL 入湖方案
MySQL 的入湖方案是直接使用開源的 Flink-CDC,將全量和增量數據通過一個 Flink 任務寫入 Kafka topic:
2. 日志數據增量入湖
- 實現 HDFSStreamingSource 和 ReaderOperator,增量同步 ODS 的數據文件,并且通過寫入 ODS 的分區索引信息,減少對 HDFS 的 list 請求;
- 支持 transform SQL 配置化,允許用戶進行自定義邏輯轉化,包括但不限于維表 join、自定義 udf、按字段分流等;
- 實現 Flink on Hudi 的 Append 模式,大幅提升不需要合并的數據寫入速率。
五、增量數據湖平臺收益
- 通過 Flink 增量同步大幅度提升了數據同步的時效性,分區就緒時間從 2:00~5:00 提前到 00:30 分內;
- 存儲引擎使用 Hudi,提供用戶基于 COW、MOR 的多種查詢方式,讓不同用戶可以根據自己 的應用場景選擇合適的查詢方式,而不是單純的只能等待分區歸檔后查詢;
- 相較于之前數倉的 T+1 Binlog 合并方式,基于 Hudi 的自動 Compaction 使得用戶可以將 Hive 當成 MySQL 的快照進行查詢;
- 大幅節約資源,原先需要重復查詢的分流任務只需要執行一次,節約大約 18000 core。
六、社區貢獻
上述優化都已經合并到 Hudi 社區,B站在未來會進一步加強 Hudi 的建設,與社區一起成?。
部分核心PR
Log in - ASF JIRA
Log in - ASF JIRA
Log in - ASF JIRA
Log in - ASF JIRA
Log in - ASF JIRA
Log in - ASF JIRA
Log in - ASF JIRA
七、未來的發展與思考
- 平臺支持流批一體,統一實時與離線邏輯;
- 推進數倉增量化,達成 Hudi ODS -> Flink -> Hudi DW -> Flink -> Hudi ADS 的全流程;
- 在 Flink 上支持 Hudi 的 Clustering,體現出 Hudi 在數據組織上的優勢,并探索 Z-Order 等加速多維查詢的性能表現;
- 支持 inline clustering。
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。?
總結
以上是生活随笔為你收集整理的Apache Hudi 在 B 站构建实时数据湖的实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于链路思想的SpringBoot单元测
- 下一篇: 2021双11上云狂欢节 | 爆款产品底