日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

汽车之家基于 Flink + Iceberg 的湖仓一体架构实践

發布時間:2024/9/3 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 汽车之家基于 Flink + Iceberg 的湖仓一体架构实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
簡介:由汽車之家實時計算平臺負責人邸星星在 4 月 17 日上海站 Meetup 分享的,基于 Flink + Iceberg 的湖倉一體架構實踐。 內容簡要:

一、數據倉庫架構升級的背景

二、基于 Iceberg 的湖倉一體架構實踐

三、總結與收益

四、后續規劃

GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~

一、數據倉庫架構升級的背景

1. 基于 Hive 的數據倉庫的痛點

原有的數據倉庫完全基于 Hive 建造而成,主要存在三大痛點:

痛點一:不支持 ACID

1)不支持 Upsert 場景;

2)不支持 Row-level delete,數據修正成本高。

痛點二:時效性難以提升

1)數據難以做到準實時可見;

2)無法增量讀取,無法實現存儲層面的流批統一;

3)無法支持分鐘級延遲的數據分析場景。

痛點三:Table Evolution

1)寫入型 Schema,對 Schema 變更支持不好;

2)Partition Spec 變更支持不友好。

2. Iceberg 關鍵特性

Iceberg 主要有四大關鍵特性:支持 ACID 語義、增量快照機制、開放的表格式和流批接口支持。

  • 支持 ACID 語義

    • 不會讀到不完整的 Commit;
    • 基于樂觀鎖支持并發 Commit;
    • Row-level delete,支持 Upsert。
  • 增量快照機制

    • Commit 后數據即可見(分鐘級);
    • 可回溯歷史快照。
  • 開放的表格式

    • 數據格式:parquet、orc、avro
    • 計算引擎:Spark、Flink、Hive、Trino/Presto
  • 流批接口支持

    • 支持流、批寫入;
    • 支持流、批讀取。

二、基于 Iceberg 的湖倉一體架構實踐

湖倉一體的意義就是說我不需要看見湖和倉,數據有著打通的元數據的格式,它可以自由的流動,也可以對接上層多樣化的計算生態。

——賈揚清(阿里云計算平臺高級研究員)

1. Append 流入湖的鏈路

上圖為日志類數據入湖的鏈路,日志類數據包含客戶端日志、用戶端日志以及服務端日志。這些日志數據會實時錄入到 Kafka,然后通過 Flink 任務寫到 Iceberg 里面,最終存儲到 HDFS。

2. Flink SQL 入湖鏈路打通

我們的 Flink SQL 入湖鏈路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對接 Iceberg Catalog 我們主要做了以下內容:

1)Meta Server 增加對 Iceberg Catalog 的支持;

2)SQL SDK 增加 Iceberg Catalog 支持。

然后在這基礎上,平臺開放 Iceberg 表的管理功能,使得用戶可以自己在平臺上建 SQL 的表。

3. 入湖 - 支持代理用戶

第二步是內部的實踐,對接現有預算體系、權限體系。

因為之前平臺做實時作業的時候,平臺都是默認為 Flink 用戶去運行的,之前存儲不涉及 HDFS 存儲,因此可能沒有什么問題,也就沒有思考預算劃分方面的問題。

但是現在寫 Iceberg 的話,可能就會涉及一些問題。比如數倉團隊有自己的集市,數據就應該寫到他們的目錄下面,預算也是劃到他們的預算下,同時權限和離線團隊賬號的體系打通。

如上所示,這塊主要是在平臺上做了代理用戶的功能,用戶可以去指定用哪個賬號去把這個數據寫到 Iceberg 里面,實現過程主要有以下三個。

  • 增加 Table 級別配置:'iceberg.user.proxy' = 'targetUser’

    1)啟用 Superuser

    2)團隊賬號鑒權

  • 訪問 HDFS 時啟用代理用戶:

  • 訪問 Hive Metastore 時指定代理用戶

    1)參考 Spark 的相關實現:

    org.apache.spark.deploy.security.HiveDelegationTokenProvider

    2)動態代理 HiveMetaStoreClient,使用代理用戶訪問 Hive metastore

4. Flink SQL 入湖示例

DDL + DML

5. CDC 數據入湖鏈路

如上所示,我們有一個 AutoDTS 平臺,負責業務庫數據的實時接入。我們會把這些業務庫的數據接入到 Kafka 里面,同時它還支持在平臺上配置分發任務,相當于把進 Kafka 的數據分發到不同的存儲引擎里,在這個場景下是分發到 Iceberg 里。

6. Flink SQL CDC 入湖鏈路打通

下面是我們基于 “Flink1.11 + Iceberg 0.11” 支持 CDC 入湖所做的改動:

  • 改進 Iceberg Sink:

    Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并適配。

  • 表管理

    1)支持 Primary key(PR1978)

    2)開啟 V2 版本:'iceberg.format.version' = '2'

7. CDC 數據入湖

1. 支持 Bucket

Upsert 場景下,需要確保同一條數據寫入到同一 Bucket 下,這又如何實現?

目前 Flink SQL 語法不支持聲明 bucket 分區,通過配置的方式聲明 Bucket:

'partition.bucket.source'='id', // 指定 bucket 字段

'partition.bucket.num'='10', // 指定 bucket 數量

2. Copy-on-write sink

做 Copy-on-Write 的原因是原本社區的 Merge-on-Read 不支持合并小文件,所以我們臨時去做了 Copy-on-write sink 的實現。目前業務一直在測試使用,效果良好。

上方為 Copy-on-Write 的實現,其實跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入FileCommitter 單并行度順序提交

在 Copy-on-Write 里面,需要根據表的數據量合理設置 Bucket 數,無需額外做小文件合并。

  • StreamWriter 在 snapshotState 階段多并行度寫入

    1)增加 Buffer;

    2)寫入前需要判斷上次 checkpoint 已經 commit 成功;

    3)按 bucket 分組、合并,逐個 Bucket 寫入。

  • FileCommitter 單并行度順序提交

    1)table.newOverwrite()

    2)Flink.last.committed.checkpoint.id

8. 示例 - CDC 數據配置入湖

如上圖所示,在實際使用中,業務方可以在 DTS 平臺上創建或配置分發任務即可。

實例類型選擇 Iceberg 表,然后選擇目標庫,表明要把哪個表的數據同步到 Iceberg 里,然后可以選原表和目標表的字段的映射關系是什么樣的,配置之后就可以啟動分發任務。啟動之后,會在實時計算平臺 Flink 里面提交一個實時任務,接著用 Copy-on-write sink 去實時地把數據寫到 Iceberg 表里面。

9. 入湖其他實踐

實踐一:減少 empty commit

  • 問題描述:

    在上游 Kafka 長期沒有數據的情況下,每次 Checkpoint 依舊會生成新的 Snapshot,導致大量的空文件和不必要的 Snapshot。

  • 解決方案(PR - 2042):

    增加配置 Flink.max-continuousempty-commits,在連續指定次數 Checkpoint 都沒有數據后才真正觸發 Commit,生成 Snapshot。

實踐二:記錄 watermark

  • 問題描述:

    目前 Iceberg 表本身無法直接反映數據寫入的進度,離線調度難以精準觸發下游任務。

  • 解決方案( PR - 2109 ):

    在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲情況,同時可以用來判斷分區數據完整性,用于調度觸發下游任務。

實踐三:刪表優化

  • 問題描述:

    刪除 Iceberg 可能會很慢,導致平臺接口相應超時。因為 Iceberg 是面向對象存儲來抽象 IO 層的,沒有快速清除目錄的方法。

  • 解決方案:

    擴展 FileIO,增加 deleteDir 方法,在 HDFS 上快速刪除表數據。

10. 小文件合并及數據清理

定期為每個表執行批處理任務(spark 3),分為以下三個步驟:

1. 定期合并新增分區的小文件:

? rewriteDataFilesAction.execute(); 僅合并小文件,不會刪除舊文件。

2. 刪除過期的 snapshot,清理元數據及數據文件:

? table.expireSnapshots().expireOld erThan(timestamp).commit();

3. 清理 orphan 文件,默認清理 3 天前,且無法觸及的文件:

? removeOrphanFilesAction.older Than(timestamp).execute();

11. 計算引擎 – Flink

Flink 是實時平臺的核心計算引擎,目前主要支持數據入湖場景,主要有以下幾個方面的特點。

  • 數據準實時入湖:

    Flink 和 Iceberg 在數據入湖方面集成度最高,Flink 社區主動擁抱數據湖技術。

  • 平臺集成:

    AutoStream 引入 IcebergCatalog,支持通過 SQL 建表、入湖 AutoDTS 支持將 MySQL、SQLServer、TiDB 表配置入湖。

  • 流批一體:

    在流批一體的理念下,Flink 的優勢會逐漸體現出來。

12. 計算引擎 – Hive

Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 集成度更高,主要提供以下三個方面的功能。

  • 定期小文件合并及 meta 信息查詢:

    SELECT * FROM prod.db.table.history 還可查看 snapshots, files, manifests。

  • 離線數據寫入:

    1)Insert into 2)Insert overwrite 3)Merge into

  • 分析查詢:

    主要支持日常的準實時分析查詢場景。

13. 計算引擎 – Trino/Presto

AutoBI 已經和 Presto 集成,用于報表、分析型查詢場景。

  • Trino

    1)直接將 Iceberg 作為報表數據源

    2)需要增加元數據緩存機制:https://github.com/trinodb/trino/issues/7551

  • Presto

    社區集成中:https://github.com/prestodb/presto/pull/15836

14. 踩過的坑

1. 訪問 Hive Metastore 異常

問題描述:HiveConf 的構造方法的誤用,導致 Hive 客戶端中聲明的配置被覆蓋,導致訪問 Hive metastore 時異常

解決方案(PR-2075):修復 HiveConf 的構造,顯示調用 addResource 方法,確保配置不會被覆蓋:hiveConf.addResource(conf);

2.Hive metastore 鎖未釋放

問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會導致上面異常。

解決方案(PR-2263):優化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的情況下顯示調用 unlock 來釋放鎖。

3. 元數據文件丟失

問題描述:Iceberg 表無法訪問,報 “NotFoundException Failed to open input stream for file : xxx.metadata.json”

解決方案(PR-2328):當調用 Hive metastore 更新 iceberg 表的 metadata_location 超時后,增加檢查機制,確認元數據未保存成功后再刪除元數據文件。

三、收益與總結

1. 總結

? 通過對湖倉一體、流批融合的探索,我們分別做了總結。

  • 湖倉一體

    1)Iceberg 支持 Hive Metastore;

    2)總體使用上與 Hive 表類似:相同數據格式、相同的計算引擎。

  • 流批融合

    準實時場景下實現流批統一:同源、同計算、同存儲。

2. 業務收益

  • 數據時效性提升:

    入倉延遲從 2 小時以上降低到 10 分鐘以內;算法核心任務 SLA 提前 2 小時完成。

  • 準實時的分析查詢:

    結合 Spark 3 和 Trino,支持準實時的多維分析查詢。

  • 特征工程提效:

    提供準實時的樣本數據,提高模型訓練時效性。

  • CDC 數據準實時入倉:

    可以在數倉針對業務表做準實時分析查詢。

3. 架構收益 - 準實時數倉

上方也提到了,我們支持準實時的入倉和分析,相當于是為后續的準實時數倉建設提供了基礎的架構驗證。準實時數倉的優勢是一次開發、口徑統一、統一存儲,是真正的批流一體。劣勢是實時性較差,原來可能是秒級、毫秒級的延遲,現在是分鐘級的數據可見性。

但是在架構層面上,這個意義還是很大的,后續我們能看到一些希望,可以把整個原來 “T + 1” 的數倉,做成準實時的數倉,提升數倉整體的數據時效性,然后更好地支持上下游的業務。

四、后續規劃

1. 跟進 Iceberg 版本

全面開放 V2 格式,支持 CDC 數據的 MOR 入湖。

2. 建設準實時數倉

基于 Flink 通過 Data pipeline 模式對數倉各層表全面提速。

3. 流批一體

隨著 upsert 功能的逐步完善,持續探索存儲層面流批一體。

4. 多維分析

基于 Presto/Spark3 輸出準實時多維分析。

原文鏈接:https://developer.aliyun.com/article/784553?

版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。

總結

以上是生活随笔為你收集整理的汽车之家基于 Flink + Iceberg 的湖仓一体架构实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。