日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践

發布時間:2023/12/24 windows 38 coder
生活随笔 收集整理的這篇文章主要介紹了 阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

湖倉一體(LakeHouse)是大數據領域的重要發展方向,提供了流批一體和湖倉結合的新場景。阿里云AnalyticDB?for?MySQL基于?Apache?Hudi?構建了新一代的湖倉平臺,提供日志、CDC等多種數據源一鍵入湖,在離線計算引擎融合分析等能力。本文將主要介紹AnalyticDB?for?MySQL基于Apache?Hudi實現多表CDC全增量入湖的經驗與實踐。

1.?背景簡介

1.1.?多表CDC入湖背景介紹

客戶在使用數據湖、傳統數據倉庫的過程中,常常會遇到以下業務痛點:

  • 全量建倉或直連分析對源庫壓力較大,需要卸載線上壓力規避故障

  • 建倉延遲較長(T+1天),需要T+10m的低延遲入湖

  • 海量數據在事務庫或傳統數倉中存儲成本高,需要低成本歸檔

  • 傳統數據湖存在不支持更新/小文件較多等缺點

  • 自建大數據數據平臺運維成本高,需要產品化、云原生、一體化的方案

  • 常見數倉的存儲不開放,需要自建能力、開源可控

  • 其他痛點和需求……

針對這些業務痛點,AnalyticDB?MySQL?數據管道組件(AnalyticDB?Pipeline?Service)?基于Apache?Hudi?實現了多表CDC全增量入湖,提供入湖和分析過程中高效的全量數據導入,增量數據實時寫入、ACID事務和多版本、小文件自動合并優化、元信息校驗和自動進化、高效的列式分析格式、高效的索引優化、超大分區表存儲等等能力,很好地解決了上述提到的客戶痛點。

1.2. Apache Hudi簡介

AnalyticDB?MySQL選擇了Apache?Hudi作為CDC入湖以及日志入湖的存儲底座?;仡?Hudi?的出現主要針對性解決Uber大數據系統中存在的以下痛點:

  • HDFS的可擴展性限制。大量的小文件會使得HDFS的Name?Node壓力很大,NameNode節點成為HDFS的瓶頸。

  • HDFS上更快的數據處理。Uber不再滿足于T+1的數據延遲。

  • 支持Hadoop?+?Parquet的更新與刪除。Uber的數據大多按天分區,舊數據不再修改,T+1?Snapshot讀源端的方式不夠高效,需要支持更新于刪除提高導入效率。

  • 更快的ETL和數據建模。原本模式下,下游的數據處理任務也必須全量地讀取數據湖的數據,Uber希望提供能力使得下游可以只讀取感興趣的增量數據。

基于以上的設計目標,Uber公司構建了Hudi(Hadoop?Upserts?Deletes?and?Incrementals)并將其捐贈給Apache基金會。從名字可以看出,Hudi最初的核心能力是高效的更新刪除,以及增量讀取Api。Hudi和“數據湖三劍客”中的其他兩位(Iceberg,DeltaLake)整體功能和架構類似,都大體由以下三個部分組成:

  1. 需要存儲的原始數據(Data?Objects)

  2. 用于提供upsert功能的索引數據?(Auxiliary?Data)

  3. 以及用于管理數據集的元數據(Metadata)

在存儲的原始數據層面,Lakehouse一般采用開源的列存格式(Parquet,ORC等),這方面沒有太大的差異。?在輔助數據層面,Hudi提供了比較高效的寫入索引(Bloomfilter,?Bucket?Index)?,使得其更加適合CDC大量更新的場景。

1.3.?業界方案簡介

阿里云AnalyticDB團隊在基于Hudi構建多表CDC入湖之前,也調研了業界的一些實現作為參考,這里簡單介紹一下一些業界的解決方案。

1.3.1.?Spark/Flink?+?Hudi?單表入湖

使用Hudi實現單表端到端CDC數據入湖的整體架構如圖所示:

圖中的第一個組件是Debezium?deployment,它由?Kafka?集群、Schema?Registry(Confluence?或?Apicurio)和?Debezium?連接器組成。會源源不斷讀取數據庫的binlog數據并將其寫入到Kafka中。

圖中的下游則是Hudi的消費端,這里我們選用Hudi提供的DeltaStreamer組件,他可以消費Kafka中的數據并寫入到Hudi數據湖中。業界實現類似單表CDC入湖,可以將上述方案中的binlog源從Debezium?+?Kafka替換成Flink?CDC?+?Kafka等等,入湖使用的計算引擎也可以根據實際情況使用Spark/Flink。

這種方式可以很好地同步CDC的數據,但是存在一個問題就是每一張表都需要創建一個單獨的入湖鏈路,如果想要同步數據庫中的多張表,則需要創建多個同步鏈路。這樣的實現存在幾個問題:

  1. 同時存在多條入湖鏈路提高了運維難度

  2. 動態增加刪除庫表比較麻煩

  3. 對于數據量小/更新不頻繁的表,也需要單獨創建一條同步鏈路,造成了資源浪費。

目前,Hudi也支持一條鏈路多表入湖,但還不夠成熟,不足以應用于生產,具體的使用可以參考這篇文檔。

1.3.2.?Flink?VVP?多表入湖

阿里云實時計算Flink版(即Flink?VVP)?是一種全托管Serverless的Flink云服務,開箱即用,計費靈活。具備一站式開發運維管理平臺,支持作業開發、數據調試、運行與監控、自動調優、智能診斷等全生命周期能力。

阿里云Flink產品提供了多表入湖的能力(binlog?->?flink?cdc?->?下游消費端),支持在一個Flink任務中同時消費多張表的binlog并寫入下游消費端:

  1. Flink?SQL執行create?table?as?table,可以把MySQL庫下所有匹配正則表達式的表同步到Hudi單表,是多對一的映射關系,會做分庫分表的合并。

  2. Flink?SQL執行create?database?as?database,可以把?MySQL庫下所有的表結構和表數據一鍵同步到下游數據庫,暫時不支持hudi表,計劃支持中。

啟動任務后的拓撲如下,一個源端binlog?source算子將數據分發到下游所有Hudi?Sink算子上。

通過Flink?VVP可以比較簡單地實現多表CDC入湖,然而,這個方案仍然存在以下的一些問題:

  1. 沒有成熟的產品化的入湖管理界面,如增刪庫表,修改配置等需要直接操作Flink作業,添加統一的庫表名前綴需要寫sql?hint。(VVP更多的還是一個全托管Flink平臺而不是一個數據湖產品)

  2. 只提供了Flink的部署形態,在不進行額外比較復雜的配置的情況下,Compaction/Clean等TableService必須運行在鏈路內,影響寫入的性能和穩定性。

綜合考慮后,我們決定采用類似Flink?VVP多表CDC入湖的方案,在AnalyticDB?MySQL上提*品化的多表CDC全增量入湖的功能。

2.?基于Flink?CDC?+?Hudi?實現多表CDC入湖

2.1.?整體架構

AnalyticDB?MySQL多表CDC入湖的主要設計目標如下:

  • 支持一鍵啟動入湖任務消費多表數據寫入Hudi,降低客戶管理成本。

  • 提*品化管理界面,用戶可以通過界面啟停編輯入湖任務,提供庫表名統一前綴,主鍵映射等產品化功能。

  • 盡可能降低入湖成本,減少入湖過程中需要部署的組件。

基于這樣的設計目標,我們初步選擇了以Flink?CDC作為binlog和全量數據源,并且不經過任何中間緩存,直接寫入Hudi的技術方案。

Flink?CDC?是?Apache?Flink?的一個Source?Connector,可以從?MySQL等數據庫讀取快照數據和增量數據。在Flink?CDC?2.0?中,實現了全程無鎖讀取,全量階段并發讀取以及斷點續傳的優化,更好地達到了“流批一體”。

使用了Flink?CDC的情況下,我們不需要擔心全量增量的切換,可以使用統一的Hudi?Upsert接口進行數據消費,Flink?CDC會負責多表全增量切換和位點管理,降低了任務管理的負擔。而Hudi并不支持原生消費多表數據,所以需要開發一套代碼,將Flink?CDC的數據寫入到下游多個Hudi表。

這樣實現的好處是:

  • 鏈路短,需要維護的組件少,成本低(不需要依賴獨立部署的binlog源組件如kafka,阿里云DTS等)

  • 業界有方案可參考,Flink?CDC?+?Hudi?單表入湖是一個比較成熟的解決方案,阿里云VVP也已經支持了Flink多表寫入Hudi。

下面詳細介紹一下?AnalyticDB?MySQL?基于這樣架構選型的一些實踐經驗。

2.2.?Flink?CDC+?Hudi?支持動態Schema變更

目前通過Flink將CDC數據寫入Hudi的流程為

  1. 數據消費:源端使用CDC?Client消費binlog數據,并進行反序列化,過濾等操作。

  2. 數據轉換:將CDC格式根據特定Schema數據轉換為Hudi支持的格式,比如Avro格式、Parquet格式、Json格式。

  3. 數據寫入:將數據寫入Hudi,部署在TM的多個Hudi?Write?Client,使用相同的Schema將數據寫入目標表。

  4. 數據提交:由部署在Flink?Job?Manager的Hudi?Coordinator進行單點提交,Commit元數據包括本次提交的文件、寫入Schema等信息。

其中,步驟2-4都要用到使用寫入Schema,在目前的實現中都是在任務部署前確定好的。同時在任務運行時沒有提供動態變更Schema的能力。

針對這個問題,我們設計實現了一套可以動態無干預更新Flink?Hudi入湖鏈路Schema的方案。整體思路為在Flink?CDC中識別DDL?binlog事件,遇到DDL事件時,停止消費增量數據,等待savepoint完成后以新的schema重新啟動任務。

這樣實現的好處是可以動態更新鏈路中的Schema,不需要人工干預。缺點是需要停止所有庫表的消費再重啟,DDL頻繁的情況下對鏈路性能的影響很大。

2.3.?Flink多表讀寫性能調優

2.3.1.?Flink?CDC?+?Hudi?Bucket?Index?全量導入調優

這里首先簡單介紹一下Flink?CDC?2.0?全量讀取?+?全增量切換的流程。在全量階段,Flink?CDC會將單表根據并行度劃分為多個chunk并分發到TaskManager并行讀取,全量讀取完成后可以在保證一致性的情況下,實現無鎖切換到增量,真正做到“流批一體”。

在導入的過程中,我們發現了兩個問題:

1)全量階段寫入的數據為log文件,但為加速查詢,需要compact成Parquet,帶來寫放大

由于全量和增量的切換Hudi是沒有感知的,所以為了實現去重,在全量階段我們也必須使用Hudi的Upsert接口,而Hudi?Bucket?Index的Uspert會產生log文件,需要進行一次Compaction才能得到parquet文件,造成一定的寫放大。并且如果全量導入的過程中compaction多次,寫放大會更加嚴重。

那么能不能犧牲讀取性能,只寫入log文件呢??答案也是否定的,log文件增多不僅會降低讀取性能,也會降低oss?file?listing的性能,使得寫入也變慢(寫入的時候會list當前file?slice中的log和base文件)

解決方法:調大Ckp間隔或者全量增量使用不同的compaction策略解決(全量階段不做compaction)

2)Flink?全量導入表之間為串行,而寫Hudi的最大并發為Bucket數,有時無法充分利用集群并發資源

Flink?CDC全量導入的是表內并行,表之間串行。導入單表的時候,如果讀+寫的并發小于集群的并發數,會造成資源浪費,在集群可用資源較多的時候,可能需要適當調高Hudi的Bucket數以提高寫入并發?。而小表并不需要很大的并發即可導入完成,在串行導入多個小表的時候一般會有資源浪費情況。如果可以支持小表并發導入,全量導入的性能會有比較好的提升。

解決辦法:適當的調大Hudi?bucket數來提高導入性能。

2.3.2.?Flink?CDC?+?Hudi?Bucket?Index?增量調優

1)?Checkpoint?反壓調優

在全增量導入的過程中,我們發現鏈路Hudi?Ckp經常反壓引起寫入抖動:

可以發現寫入流量的波動非常大。

我們詳細排查了寫入鏈路,發現反壓主要是因為Hudi?Ckp時會flush數據,在流量比較大時候,可能需要在一個ckp間隔內flush?3G數據,造成寫入停頓。

解決這個問題的思路就是調小Hudi?Stream?Write的buffer大小(即write.task.max.size)將Checkpoint窗口期間flush數據的壓力平攤到平時。

從上圖可以看到,調整了buffer?size后,因checkpoint造成了反壓引起的寫入流量變化得到了很好的緩解。

為了緩解Ckp的反壓,我們還做了其他的一些優化:

  • 調小Hudi?bucket?number,減少Ckp期間需要flush的文件個數(這個和全量階段調大bucket數是沖突的,需要權衡選擇)

  • 使用鏈路外Spark作業及時運行Compaction,避免積累log文件過多導致寫log時list?files的開銷過大

2)?提供合適的寫入Metrics幫助排查性能問題

在調優flink鏈路的過程中,我們發現了flink?hudi寫入相關的metrics缺失的比較嚴重,排查時需要通過比較麻煩的手段分析性能(如觀察現場日志,dump內存、做cpu?profiling等)。于是,我們在內部開發了一套Flink?Stream?Write的?Metrics?指標幫助我們可以快速的定位性能問題。

指標主要包括:

  • 當前Stream?Write算子占據的buffer大小

  • Flush?Buffer耗時

  • 請求OSS創建文件耗時

  • 當前活躍的寫入文件數

  • ....

Stream?Write/Append?Write?占據的堆內內存Buffer大小統計:

Parquet/Avro?log?Flush到磁盤耗時:

通過指標值的變化可以幫助快速定位問題,比如上圖Hudi?flush的耗時有一個上揚的趨勢,我們很快定位發現了因為Compaction做得不及時,導致log文件積壓,使得file?listing速度減慢。在調大Compaction資源后,Flush耗時可以保持平穩。

Flink-Hudi?Metrics相關的代碼我們也在持續貢獻到社區,具體可以參考HUDI-2141。

3)?Compaction調優

為了簡化配置,我們一開始采用了在鏈路內Compaction的方案,但是我們很快就發現了Compaction對寫入資源的搶占非常嚴重,并且負載不穩定,很大影響了寫入鏈路的性能和穩定性。如下圖,Compaction和GC幾乎吃滿了Task?Manager的Cpu資源。

于是,我們采用了TableService和寫入鏈路分離部署的策略,使用Spark離線任務運行TableService,使得TableService和寫入鏈路相互不影響。并且,Table?Service的消耗的是Serverless資源,按需收費。寫入鏈路因為不用做Compaction,可以保持一個比較小的資源,整體來看資源利用率和性能穩定性都得到了很好的提升。

為了方便管理數據庫內多表的TableService,我們開發了一個可以在單個Spark任務內運行多表的多個TableService的實用工具,目前已經貢獻到社區,可以參見PR。

3.?Flink?CDC?Hudi?多表入湖總結

經過我們多輪的開發和調優,Flink?CDC?多表寫入?Hudi?達到了一個基本可用的狀態。其中,我們認為比較關鍵的穩定性/性能優化是

  • 將Compaction從寫入鏈路獨立出去,提高寫入和Compaction的資源利用率

  • 開發了一套Flink?Hudi?Metrics系統,結合源碼和日志精細化調優Hoodie?Stream?Write。

但是,這套架構方案仍然存在以下的一些無法簡單解決的問題:

  1. Flink?Hudi不支持schema?evolution。Hudi轉換Flink?Row到HoodieRecord所用的schema在拓撲被創建時固定,這意味著每次DDL都需要重啟Flink鏈路,影響增量消費。而支持不停止任務動態變更Schema在Flink?Hudi場景經POC,改造難度比較大。

  2. 多表同步需要較大的資源開銷,對于沒有數據的表,仍然需要維護他們的算子,造成不必要的開銷。

  3. 新增同步表和摘除同步表需要重啟鏈路。Flink任務拓撲在任務啟動時固定,新增表/刪除表都需要更改拓撲重啟鏈路,影響增量消費。

  4. 直接讀取源庫/binlog對源庫壓力大,多并發讀取binlog容易打掛源庫,也使得binlog?client不穩定。并且由于沒有中間緩存,一旦binlog位點過期,數據需要重新導入。

  5. 全量同步同一時刻只能并發同步一張表,對于小表的導入不夠高效,大表也有可能因為并發設置較小而利用不滿資源。

  6. Hudi的Bucket數對全量導入和增量Upsert寫入的性能影響很大,但是使用Flink?CDC?+?Hudi的框架目前沒辦法為數據庫里不同的表決定不同的Bucket數,使得這個值難以權衡。

如果繼續基于這套方案實現多表CDC入湖,我們也可以嘗試從下面的一些方向著手:

  1. 優化Flink?CDC全量導入,支持多表并發導入,支持導入時對源表數據量進行sample以動態決定Hudi的Bucket?Index?Number。解決上述問題5,問題6。

  2. 引入Hudi的Consistent?Hashing?Bucket?Index,從Hudi端解決bucket?index數無法動態變更的問題,參考HUDI-6329。解決上述問題5,問題6。

  3. 引入一個新的binlog緩存組件(自己搭建或者使用云上成熟產品),下游多個鏈路從緩存隊列中讀取binlog,而不是直接訪問源庫。解決上述問題4。

  4. Flink支持動態拓撲,或者Hudi支持動態變更Schema。解決上述問題1,2,3。

不過,基于經過內部討論和驗證,我們認為繼續基于Flink?+?Hudi框架實現多表CDC全增量入湖難度較大,針對這個場景,應該更換為Spark引擎。主要的一些考慮如下。

  1. 上述討論的Flink-Hudi優化方向,工程量和難度都比較大,有些涉及到了核心機制的變動。

  2. 團隊內部對Spark全增量多表入湖有一定的積累,線上已經有了長期穩定運行的客戶案例。

  3. 基于Spark引擎的功能豐富度更好,如Spark微批語義可以支持隱式的動態Schema變更,Table?Service也更適合使用Spark批作業運行。

在我們后續的實踐中,也證實了我們的判斷是正確的。引擎更換為Spark后,多表CDC全增量入湖的功能豐富程度,擴展性,性能和穩定性都得到了很好的提升。我們將在之后的文章中介紹我們基于Spark+Hudi實現多表CDC全增量的實踐,也歡迎讀者們關注。

4.?參考資料

  1. Flink?CDC?+?Hudi?海量數據入湖在順豐的實踐

  2. Change?Data?Capture?with?Debezium?and?Apache?Hudi

  3. 使用?Flink?Hudi?構建流式數據湖平臺

  4. 基于?Apache?Hudi?的湖倉一體技術在?Shopee?的實踐

  5. 深入解讀?Flink?CDC?增量快照框架

  6. CDC一鍵入湖:當?Apache?Hudi?DeltaStreamer?遇見?Serverless?Spark

總結

以上是生活随笔為你收集整理的阿里云AnalyticDB基于Flink CDC+Hudi实现多表全增量入湖实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。