日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink + Iceberg 在去哪儿的实时数仓实践

發布時間:2024/9/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink + Iceberg 在去哪儿的实时数仓实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
簡介:本文介紹去哪兒數據平臺在使用 Flink + Iceberg 0.11 的一些實踐。

作者:余東

摘要: 本文介紹去哪兒數據平臺在使用 Flink + Iceberg 0.11 的一些實踐。內容包括:

  • 背景及痛點
  • Iceberg 架構
  • 痛點一:Kafka 數據丟失
  • 痛點二:近實時 Hive 壓力大
  • Iceberg 優化實踐
  • 總結

GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~

一、背景及痛點

1. 背景

我們在使用 Flink 做實時數倉以及數據傳輸過程中,遇到了一些問題:比如 Kafka 數據丟失,Flink 結合 Hive 的近實時數倉性能等。Iceberg 0.11 的新特性解決了這些業務場景碰到的問題。對比 Kafka 來說,Iceberg 在某些特定場景有自己的優勢,在此我們做了一些基于 Iceberg 的實踐分享。

2. 原架構方案

原先的架構采用 Kafka 存儲實時數據,其中包括日志、訂單、車票等數據。然后用 Flink SQL 或者 Flink datastream 消費數據進行流轉。內部自研了提交 SQL 和 Datastream 的平臺,通過該平臺提交實時作業。

3. 痛點

  • Kafka 存儲成本高且數據量大。Kafka 由于壓力大將數據過期時間設置的比較短,當數據產生反壓,積壓等情況時,如果在一定的時間內沒消費數據導致數據過期,會造成數據丟失。
  • Flink 在 Hive 上做了近實時的讀寫支持。為了分擔 Kafka 壓力,將一些實時性不太高的數據放入 Hive,讓 Hive 做分鐘級的分區。但是隨著元數據不斷增加,Hive metadata 的壓力日益顯著,查詢也變得更慢,且存儲 Hive 元數據的數據庫壓力也變大。

二、Iceberg 架構

1. Iceberg 架構解析

術語解析

  • 數據文件(data files)

    Iceberg 表真實存儲數據的文件,一般存儲在 data 目錄下,以 “.parquet” 結尾。

  • 清單文件(Manifest file)

    每行都是每個數據文件的詳細描述,包括數據文件的狀態、文件路徑、分區信息、列級別的統計信息(比如每列的最大最小值、空值數等)。通過該文件,可過濾掉無關數據,提高檢索速度。

  • 快照(Snapshot)

    快照代表一張表在某個時刻的狀態。每個快照版本包含某個時刻的所有數據文件列表。Data files 存儲在不同的 manifest files 里面, manifest files 存儲在一個 Manifest list 文件里面,而一個 Manifest list 文件代表一個快照。

2. Iceberg 查詢計劃

查詢計劃是在表中查找 “查詢所需文件” 的過程。

  • 元數據過濾

    清單文件包括分區數據元組和每個數據文件的列級統計信息。在計劃期間,查詢謂詞會自動轉換為分區數據上的謂詞,并首先應用于過濾數據文件。接下來,使用列級值計數,空計數,下限和上限來消除與查詢謂詞不匹配的文件。

  • Snapshot ID

    每個 Snapshot ID 會關聯到一組 manifest files,而每一組 manifest files 包含很多 manifest file。

  • manifest files 文件列表

    每個 manifest files 又記錄了當前 data 數據塊的元數據信息,其中就包含了文件列的最大值和最小值,然后根據這個元數據信息,索引到具體的文件塊,從而更快的查詢到數據。

三、痛點一:Kafka 數據丟失

1. 痛點介紹

通常我們會選擇 Kafka 做實時數倉,以及日志傳輸。Kafka 本身存儲成本很高,且數據保留時間有時效性,一旦消費積壓,數據達到過期時間后,就會造成數據丟失且沒有消費到。

2. 解決方案

將實時要求不高的業務數據入湖、比如說能接受 1-10 分鐘的延遲。因為 Iceberg 0.11 也支持 SQL 實時讀取,而且還能保存歷史數據。這樣既可以減輕線上 Kafka 的壓力,還能確保數據不丟失的同時也能實時讀取。

3 .為什么 Iceberg 只能做近實時入湖?

  • Iceberg 提交 Transaction 時是以文件粒度來提交。這就沒法以秒為單位提交 Transaction,否則會造成文件數量膨脹;
  • 沒有在線服務節點。對于實時的高吞吐低延遲寫入,無法得到純實時的響應;
  • Flink 寫入以 checkpoint 為單位,物理數據寫入 Iceberg 后并不能直接查詢,當觸發了 checkpoint 才會寫 metadata 文件,這時數據由不可見變為可見。checkpoint 每次執行都會有一定時間。
  • 4. Flink 入湖分析

    組件介紹

    • IcebergStreamWriter

      主要用來寫入記錄到對應的 avro、parquet、orc 文件,生成一個對應的 Iceberg DataFile,并發送給下游算子。

    另外一個叫做 IcebergFilesCommitter,主要用來在 checkpoint 到來時把所有的 DataFile 文件收集起來,并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的數據寫入,生成 DataFile。

    • IcebergFilesCommitter

      為每個 checkpointId 維護了一個 DataFile 文件列表,即 map<Long, List>,這樣即使中間有某個 checkpoint 的 transaction 提交失敗了,它的 DataFile 文件仍然維護在 State 中,依然可以通過后續的 checkpoint 來提交數據到 Iceberg 表中。

    5. Flink SQL Demo

    Flink Iceberg 實時入湖流程,消費 Kafka 數據寫入 Iceberg,并從 Iceberg 近實時讀取數據。

    5.1 前期工作

    • 開啟實時讀寫功能

      set execution.type = streaming

    • 開啟 table sql hint 功能來使用 OPTIONS 屬性

      set table.dynamic-table-options.enabled=true

    • 注冊 Iceberg catalog 用于操作 Iceberg 表

      CREATE CATALOG Iceberg_catalog WITH (\n" +" 'type'='Iceberg',\n" +" 'catalog-type'='Hive'," +" 'uri'='thrift://localhost:9083'" +");
    • Kafka 實時數據入湖

      insert into Iceberg_catalog.Iceberg_db.tbl1 \n select * from Kafka_tbl;
    • 數據湖之間實時流轉 tbl1 -> tbl2

      insert into Iceberg_catalog.Iceberg_db.tbl2 select * from Iceberg_catalog.Iceberg_db.tbl1 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/

    5.2 參數解釋

    • monitor-interval

      連續監視新提交的數據文件的時間間隔(默認值:1s)。

    • start-snapshot-id

      從指定的快照 ID 開始讀取數據、每個快照 ID 關聯的是一組 manifest file 元數據文件,每個元數據文件映射著自己的真實數據文件,通過快照 ID,從而讀取到某個版本的數據。

    6. 踩坑記錄

    我之前在 SQL Client 寫數據到 Iceberg,data 目錄數據一直在更新,但是 metadata 沒有數據,導致查詢的時候沒有數,因為 Iceberg 的查詢是需要元數據來索引真實數據的。SQL Client 默認沒有開啟 checkpoint,需要通過配置文件來開啟狀態。所以會導致 data 目錄寫入數據而 metadata 目錄不寫入元數據。

    PS:無論通過 SQL 還是 Datastream 入湖,都必須開啟 Checkpoint。

    7. 數據樣例

    下面兩張圖展示的是實時查詢 Iceberg 的效果,一秒前和一秒后的數據變化情況。

    • 一秒前的數據

    • 一秒后刷新的數據

    四、痛點二:Flink 結合 Hive 的近實時越來越慢

    1. 痛點介紹

    選用 Flink + Hive 的近實時架構雖然支持了實時讀寫,但是這種架構帶來的問題是隨著表和分區增多,將會面臨以下問題:

    • 元數據過多

      Hive 將分區改為小時 / 分鐘級,雖然提高了數據的準實時性,但是 metestore 的壓力也是顯而易見的,元數據過多導致生成查詢計劃變慢,而且還會影響線上其他業務穩定。

    • 數據庫壓力變大

      隨著元數據增加,存儲 Hive 元數據的數據庫壓力也會增加,一段時間后,還需要對該庫進行擴容,比如存儲空間。

    2. 解決方案

    將原先的 Hive 近實時遷移到 Iceberg。為什么 Iceberg 可以處理元數據量大的問題,而 Hive 在元數據大的時候卻容易形成瓶頸?

    • Iceberg 是把 metadata 維護在可拓展的分布式文件系統上,不存在中心化的元數據系統;
    • Hive 則是把 partition 之上的元數據維護在 metastore 里面(partition 過多則給 mysql 造成巨大壓力),而 partition 內的元數據其實是維護在文件內的(啟動作業需要列舉大量文件才能確定文件是否需要被掃描,整個過程非常耗時)。

    五、優化實踐

    1. 小文件處理

    • Iceberg 0.11 以前,通過定時觸發 batch api 進行小文件合并,這樣雖然能合并,但是需要維護一套 Actions 代碼,而且也不是實時合并的。

      Table table = findTable(options, conf); Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(10 * 1024) // 10KB.execute();
    • Iceberg 0.11 新特性,支持了流式小文件合并。

      通過分區/存儲桶鍵使用哈希混洗方式寫數據、從源頭直接合并文件,這樣的好處在于,一個 task 會處理某個分區的數據,提交自己的 Datafile 文件,比如一個 task 只處理對應分區的數據。這樣避免了多個 task 處理提交很多小文件的問題,且不需要額外的維護代碼,只需在建表的時候指定屬性 write.distribution-mode,該參數與其它引擎是通用的,比如 Spark 等。

      CREATE TABLE city_table ( province BIGINT,city STRING) PARTITIONED BY (province, city) WITH ('write.distribution-mode'='hash' );

    2. Iceberg 0.11 排序

    2.1 排序介紹

    在 Iceberg 0.11 之前,Flink 是不支持 Iceberg 排序功能的,所以之前只能結合 Spark 以批模式來支持排序功能,0.11 新增了排序特性的支持,也意味著,我們在實時也可以體會到這個好處。

    排序的本質是為了掃描更快,因為按照 sort key 做了聚合之后,所有的數據都按照從小到大排列,max-min 可以過濾掉大量的無效數據。

    2.2 排序 demo

    insert into Iceberg_table select days from Kafka_tbl order by days, province_id;

    3. Iceberg 排序后 manifest 詳解

    參數解釋

    • file_path:物理文件位置。
    • partition:文件所對應的分區。
    • lower_bounds:該文件中,多個排序字段的最小值,下圖是我的 days 和 province_id 最小值。
    • upper_bounds:該文件中,多個排序字段的最大值,下圖是我的 days 和 province_id 最大值。

    通過分區、列的上下限信息來確定是否讀取 file_path 的文件,數據排序后,文件列的信息也會記錄在元數據中,查詢計劃從 manifest 去定位文件,不需要把信息記錄在 Hive metadata,從而減輕 Hive metadata 壓力,提升查詢效率。

    利用 Iceberg 0.11 的排序特性,將天作為分區。按天、小時、分鐘進行排序,那么 manifest 文件就會記錄這個排序規則,從而在檢索數據的時候,提高查詢效率,既能實現 Hive 分區的檢索優點,還能避免 Hive metadata 元數據過多帶來的壓力。

    六、總結

    相較于之前的版本來說,Iceberg 0.11 新增了許多實用的功能,對比了之前使用的舊版本,做以下總結:

    • Flink + Iceberg 排序功能

      在 Iceberg 0.11 以前,排序功能集成了 Spark,但沒有集成 Flink,當時用 Spark + Iceberg 0.10 批量遷移了一批 Hive 表。在 BI 上的收益是: 原先 BI 為了提升 Hive 查詢速度建了多級分區,導致小文件和元數據過多,入湖過程中,利用 Spark 排序 BI 經常查詢的條件,結合隱式分區,最終提升 BI 檢索速度的同時,也沒有小文件的問題,Iceberg 有自身的元數據,也減少了 Hive metadata 的壓力。

      Icebeg 0.11 支持了 Flink 的排序,是一個很實用的功能點。我們可以把原先 Flink + Hive 的分區轉移到 Iceberg 排序中,既能達到 Hive 分區的效果,也能減少小文件和提升查詢效率。

    • 實時讀取數據

      通過 SQL 的編程方式,即可實現數據的實時讀取。好處在于,可以把實時性要求不高的,比如業務可以接受 1-10 分鐘延遲的數據放入 Iceberg 中 ,在減少 Kafka 壓力的同時,也能實現數據的近實時讀取,還能保存歷史數據。

    • 實時合并小文件

      在Iceberg 0.11以前,需要用 Iceberg 的合并 API 來維護小文件合并,該 API 需要傳入表信息,以及定時信息,且合并是按批次這樣進行的,不是實時的。從代碼上來說,增加了維護和開發成本;從時效性來說,不是實時的。0.11 用 Hash 的方式,從源頭對數據進行實時合并,只需在 SQL 建表時指定 ('write.distribution-mode'='hash') 屬性即可,不需要手工維護。

    原文鏈接:https://developer.aliyun.com/article/784412?

    版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。

    總結

    以上是生活随笔為你收集整理的Flink + Iceberg 在去哪儿的实时数仓实践的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。