Flink + Iceberg 在去哪儿的实时数仓实践
作者:余東
摘要: 本文介紹去哪兒數據平臺在使用 Flink + Iceberg 0.11 的一些實踐。內容包括:
- 背景及痛點
- Iceberg 架構
- 痛點一:Kafka 數據丟失
- 痛點二:近實時 Hive 壓力大
- Iceberg 優化實踐
- 總結
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一、背景及痛點
1. 背景
我們在使用 Flink 做實時數倉以及數據傳輸過程中,遇到了一些問題:比如 Kafka 數據丟失,Flink 結合 Hive 的近實時數倉性能等。Iceberg 0.11 的新特性解決了這些業務場景碰到的問題。對比 Kafka 來說,Iceberg 在某些特定場景有自己的優勢,在此我們做了一些基于 Iceberg 的實踐分享。
2. 原架構方案
原先的架構采用 Kafka 存儲實時數據,其中包括日志、訂單、車票等數據。然后用 Flink SQL 或者 Flink datastream 消費數據進行流轉。內部自研了提交 SQL 和 Datastream 的平臺,通過該平臺提交實時作業。
3. 痛點
- Kafka 存儲成本高且數據量大。Kafka 由于壓力大將數據過期時間設置的比較短,當數據產生反壓,積壓等情況時,如果在一定的時間內沒消費數據導致數據過期,會造成數據丟失。
- Flink 在 Hive 上做了近實時的讀寫支持。為了分擔 Kafka 壓力,將一些實時性不太高的數據放入 Hive,讓 Hive 做分鐘級的分區。但是隨著元數據不斷增加,Hive metadata 的壓力日益顯著,查詢也變得更慢,且存儲 Hive 元數據的數據庫壓力也變大。
二、Iceberg 架構
1. Iceberg 架構解析
術語解析
- 數據文件(data files)
Iceberg 表真實存儲數據的文件,一般存儲在 data 目錄下,以 “.parquet” 結尾。
- 清單文件(Manifest file)
每行都是每個數據文件的詳細描述,包括數據文件的狀態、文件路徑、分區信息、列級別的統計信息(比如每列的最大最小值、空值數等)。通過該文件,可過濾掉無關數據,提高檢索速度。
- 快照(Snapshot)
快照代表一張表在某個時刻的狀態。每個快照版本包含某個時刻的所有數據文件列表。Data files 存儲在不同的 manifest files 里面, manifest files 存儲在一個 Manifest list 文件里面,而一個 Manifest list 文件代表一個快照。
2. Iceberg 查詢計劃
查詢計劃是在表中查找 “查詢所需文件” 的過程。
- 元數據過濾
清單文件包括分區數據元組和每個數據文件的列級統計信息。在計劃期間,查詢謂詞會自動轉換為分區數據上的謂詞,并首先應用于過濾數據文件。接下來,使用列級值計數,空計數,下限和上限來消除與查詢謂詞不匹配的文件。
- Snapshot ID
每個 Snapshot ID 會關聯到一組 manifest files,而每一組 manifest files 包含很多 manifest file。
- manifest files 文件列表
每個 manifest files 又記錄了當前 data 數據塊的元數據信息,其中就包含了文件列的最大值和最小值,然后根據這個元數據信息,索引到具體的文件塊,從而更快的查詢到數據。
三、痛點一:Kafka 數據丟失
1. 痛點介紹
通常我們會選擇 Kafka 做實時數倉,以及日志傳輸。Kafka 本身存儲成本很高,且數據保留時間有時效性,一旦消費積壓,數據達到過期時間后,就會造成數據丟失且沒有消費到。
2. 解決方案
將實時要求不高的業務數據入湖、比如說能接受 1-10 分鐘的延遲。因為 Iceberg 0.11 也支持 SQL 實時讀取,而且還能保存歷史數據。這樣既可以減輕線上 Kafka 的壓力,還能確保數據不丟失的同時也能實時讀取。
3 .為什么 Iceberg 只能做近實時入湖?
4. Flink 入湖分析
組件介紹
- IcebergStreamWriter
主要用來寫入記錄到對應的 avro、parquet、orc 文件,生成一個對應的 Iceberg DataFile,并發送給下游算子。
另外一個叫做 IcebergFilesCommitter,主要用來在 checkpoint 到來時把所有的 DataFile 文件收集起來,并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的數據寫入,生成 DataFile。
- IcebergFilesCommitter
為每個 checkpointId 維護了一個 DataFile 文件列表,即 map<Long, List>,這樣即使中間有某個 checkpoint 的 transaction 提交失敗了,它的 DataFile 文件仍然維護在 State 中,依然可以通過后續的 checkpoint 來提交數據到 Iceberg 表中。
5. Flink SQL Demo
Flink Iceberg 實時入湖流程,消費 Kafka 數據寫入 Iceberg,并從 Iceberg 近實時讀取數據。
5.1 前期工作
- 開啟實時讀寫功能
set execution.type = streaming
- 開啟 table sql hint 功能來使用 OPTIONS 屬性
set table.dynamic-table-options.enabled=true
注冊 Iceberg catalog 用于操作 Iceberg 表
CREATE CATALOG Iceberg_catalog WITH (\n" +" 'type'='Iceberg',\n" +" 'catalog-type'='Hive'," +" 'uri'='thrift://localhost:9083'" +");Kafka 實時數據入湖
insert into Iceberg_catalog.Iceberg_db.tbl1 \n select * from Kafka_tbl;數據湖之間實時流轉 tbl1 -> tbl2
insert into Iceberg_catalog.Iceberg_db.tbl2 select * from Iceberg_catalog.Iceberg_db.tbl1 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/
5.2 參數解釋
- monitor-interval
連續監視新提交的數據文件的時間間隔(默認值:1s)。
- start-snapshot-id
從指定的快照 ID 開始讀取數據、每個快照 ID 關聯的是一組 manifest file 元數據文件,每個元數據文件映射著自己的真實數據文件,通過快照 ID,從而讀取到某個版本的數據。
6. 踩坑記錄
我之前在 SQL Client 寫數據到 Iceberg,data 目錄數據一直在更新,但是 metadata 沒有數據,導致查詢的時候沒有數,因為 Iceberg 的查詢是需要元數據來索引真實數據的。SQL Client 默認沒有開啟 checkpoint,需要通過配置文件來開啟狀態。所以會導致 data 目錄寫入數據而 metadata 目錄不寫入元數據。
PS:無論通過 SQL 還是 Datastream 入湖,都必須開啟 Checkpoint。
7. 數據樣例
下面兩張圖展示的是實時查詢 Iceberg 的效果,一秒前和一秒后的數據變化情況。
- 一秒前的數據
- 一秒后刷新的數據
四、痛點二:Flink 結合 Hive 的近實時越來越慢
1. 痛點介紹
選用 Flink + Hive 的近實時架構雖然支持了實時讀寫,但是這種架構帶來的問題是隨著表和分區增多,將會面臨以下問題:
- 元數據過多
Hive 將分區改為小時 / 分鐘級,雖然提高了數據的準實時性,但是 metestore 的壓力也是顯而易見的,元數據過多導致生成查詢計劃變慢,而且還會影響線上其他業務穩定。
- 數據庫壓力變大
隨著元數據增加,存儲 Hive 元數據的數據庫壓力也會增加,一段時間后,還需要對該庫進行擴容,比如存儲空間。
2. 解決方案
將原先的 Hive 近實時遷移到 Iceberg。為什么 Iceberg 可以處理元數據量大的問題,而 Hive 在元數據大的時候卻容易形成瓶頸?
- Iceberg 是把 metadata 維護在可拓展的分布式文件系統上,不存在中心化的元數據系統;
- Hive 則是把 partition 之上的元數據維護在 metastore 里面(partition 過多則給 mysql 造成巨大壓力),而 partition 內的元數據其實是維護在文件內的(啟動作業需要列舉大量文件才能確定文件是否需要被掃描,整個過程非常耗時)。
五、優化實踐
1. 小文件處理
Iceberg 0.11 以前,通過定時觸發 batch api 進行小文件合并,這樣雖然能合并,但是需要維護一套 Actions 代碼,而且也不是實時合并的。
Table table = findTable(options, conf); Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(10 * 1024) // 10KB.execute();Iceberg 0.11 新特性,支持了流式小文件合并。
通過分區/存儲桶鍵使用哈希混洗方式寫數據、從源頭直接合并文件,這樣的好處在于,一個 task 會處理某個分區的數據,提交自己的 Datafile 文件,比如一個 task 只處理對應分區的數據。這樣避免了多個 task 處理提交很多小文件的問題,且不需要額外的維護代碼,只需在建表的時候指定屬性 write.distribution-mode,該參數與其它引擎是通用的,比如 Spark 等。
CREATE TABLE city_table ( province BIGINT,city STRING) PARTITIONED BY (province, city) WITH ('write.distribution-mode'='hash' );
2. Iceberg 0.11 排序
2.1 排序介紹
在 Iceberg 0.11 之前,Flink 是不支持 Iceberg 排序功能的,所以之前只能結合 Spark 以批模式來支持排序功能,0.11 新增了排序特性的支持,也意味著,我們在實時也可以體會到這個好處。
排序的本質是為了掃描更快,因為按照 sort key 做了聚合之后,所有的數據都按照從小到大排列,max-min 可以過濾掉大量的無效數據。
2.2 排序 demo
insert into Iceberg_table select days from Kafka_tbl order by days, province_id;3. Iceberg 排序后 manifest 詳解
參數解釋
- file_path:物理文件位置。
- partition:文件所對應的分區。
- lower_bounds:該文件中,多個排序字段的最小值,下圖是我的 days 和 province_id 最小值。
- upper_bounds:該文件中,多個排序字段的最大值,下圖是我的 days 和 province_id 最大值。
通過分區、列的上下限信息來確定是否讀取 file_path 的文件,數據排序后,文件列的信息也會記錄在元數據中,查詢計劃從 manifest 去定位文件,不需要把信息記錄在 Hive metadata,從而減輕 Hive metadata 壓力,提升查詢效率。
利用 Iceberg 0.11 的排序特性,將天作為分區。按天、小時、分鐘進行排序,那么 manifest 文件就會記錄這個排序規則,從而在檢索數據的時候,提高查詢效率,既能實現 Hive 分區的檢索優點,還能避免 Hive metadata 元數據過多帶來的壓力。
六、總結
相較于之前的版本來說,Iceberg 0.11 新增了許多實用的功能,對比了之前使用的舊版本,做以下總結:
- Flink + Iceberg 排序功能
在 Iceberg 0.11 以前,排序功能集成了 Spark,但沒有集成 Flink,當時用 Spark + Iceberg 0.10 批量遷移了一批 Hive 表。在 BI 上的收益是: 原先 BI 為了提升 Hive 查詢速度建了多級分區,導致小文件和元數據過多,入湖過程中,利用 Spark 排序 BI 經常查詢的條件,結合隱式分區,最終提升 BI 檢索速度的同時,也沒有小文件的問題,Iceberg 有自身的元數據,也減少了 Hive metadata 的壓力。
Icebeg 0.11 支持了 Flink 的排序,是一個很實用的功能點。我們可以把原先 Flink + Hive 的分區轉移到 Iceberg 排序中,既能達到 Hive 分區的效果,也能減少小文件和提升查詢效率。
- 實時讀取數據
通過 SQL 的編程方式,即可實現數據的實時讀取。好處在于,可以把實時性要求不高的,比如業務可以接受 1-10 分鐘延遲的數據放入 Iceberg 中 ,在減少 Kafka 壓力的同時,也能實現數據的近實時讀取,還能保存歷史數據。
- 實時合并小文件
在Iceberg 0.11以前,需要用 Iceberg 的合并 API 來維護小文件合并,該 API 需要傳入表信息,以及定時信息,且合并是按批次這樣進行的,不是實時的。從代碼上來說,增加了維護和開發成本;從時效性來說,不是實時的。0.11 用 Hash 的方式,從源頭對數據進行實時合并,只需在 SQL 建表時指定 ('write.distribution-mode'='hash') 屬性即可,不需要手工維護。
原文鏈接:https://developer.aliyun.com/article/784412?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的Flink + Iceberg 在去哪儿的实时数仓实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: bilibili基于 Flink 的机器
- 下一篇: 独家下载!《零售数据中台通关指南》,带你