基于Flink的高可靠实时ETL系统
GIAC(GLOBAL INTERNET ARCHITECTURE CONFERENCE)是長期關注互聯網技術與架構的高可用架構技術社區和msup推出的,面向架構師、技術負責人及高端技術從業人員的年度技術架構大會,是中國地區規模最大的技術會議之一。
今年的第六屆GIAC大會上,在大數據架構專題,騰訊數據平臺部實時計算負責人施曉罡發表了《基于Flink的高可靠實時ETL系統》的主題演講。以下為嘉賓演講實錄:
施曉罡畢業于北京大學,獲得博士學位,是Apache Flink項目Committer。在SIGMOD, TODS和IPDPS等國際頂級會議和期刊上發表過多篇論文,并擔任KDD,DASFAA等國際頂級會議的程序委員會委員。
實時計算平臺Oceanus
近年來,實時計算在騰訊得到了越來越廣泛的應用。為了提高用戶流計算任務持續集成和持續發布的效率,騰訊大數據團隊從2017年開始圍繞Flink打造了Oceanus,一個集開發、測試、部署和運維于一體的一站式可視化實時計算平臺。
Oceanus提供了三種不同的應用開發方式,包括畫布,SQL和Jar,來滿足不同用戶的開發需求。通過這三種方式,不同應用場景的用戶不需要了解底層框架的技術細節,可以很快的進行實時計算任務的開發,降低了用戶開發的門檻。
在完成作業開發之后,用戶可以通過Oceanus對作業進行測試、配置和部署。Oceanus為用戶程序提供了一系列的工具來協助作業測試。用戶既可以使用Oceanus提供的一鍵生成功能產生測試數據,也可以自己向Oceanus上傳自己的的測試數據,通過對比預期結果和實際結果來驗證應用邏輯的正確性。Oceanus依托騰訊內部的資源調度系統Gaia來進行資源管理和作業部署。用戶可以通過Oceanus配置作業所需要的CPU和內存資源,并指定作業需要部署的集群。當用戶完成配置之后,Oceanus會向Gaia申請對應的資源并將作業提交到Gaia上運行。
Oceanus對Flink作業運行時的多個運行指標進行采集,包括Task Manger的內存,I/O和GC等。通過這些豐富的運行指標,用戶能夠很好的了解應用運行的情況,并在出現異常時能協助用戶及時的定位問題。運維人員則可以通過這些采集到的指標,設置報警策略并實現精細化的運營。
而在Oceanus之上,騰訊大數據還對ETL,監控告警和在線學習等常見的實時計算任務提供了場景化的支持。例如Oceanus-ML提供端到端的在線機器學習,涵蓋數據接入,數據處理,特征工程,算法訓練,模型評估,模型部署整個機器學習流程。通過Oceanus-ML,用戶可以方便地利用完備的數據處理函數,豐富的在線學習算法來構建自己的在線學習任務,輕松地完成模型訓練和評估,進行一鍵部署模型。
而對ETL場景,Oceanus也提供了Oceanus-ETL產品來幫助用戶將應用和產品中采集的數據實時地導入到數據倉庫中。目前騰訊大數據團隊為騰訊內部包括微信、QQ音樂、騰訊游戲在內的多個業務提供了數據接入服務,每天處理的消息數超過了40萬億條,每秒接入的峰值超過了4億條。
實時數據接入平臺Oceanus-ETL
騰訊大數據早在2012年起就開始了進行數據接入的工作,并基于Storm構建了第一代的騰訊數據銀行(TDBank),成為了騰訊大數據平臺的第一線,提供了文件、消息和數據庫等多種接入方式,統一了數據接入入口,提供了高效實時的分布式數據分發。
而在2017年,騰訊大數據基于Flink在易用性、可靠性和性能上的優勢,通過Flink對TDBank的數據接入進行了重構。相比于Storm,Flink對state提供了更多的支持。一方面Flink將程序的狀態保存在本地的內存或者RocksDB中,用戶不需要通過網絡遠程訪問狀態數據,因此可以獲得較好的作業性能。而另一方面,Flink通過Chandy-Lamport算法提供了高效和輕量的檢查點機制,可以保證在發生故障時仍能實現Exactly Once和At-Least Once的數據處理語義。
而隨著騰訊業務規模的不斷增加,對數據接入也提出了更高的要求,需要能夠
保證端到端的“有且僅有一次”和“強一致”的語義
保證ACID事務和讀寫分離,避免下游出現臟讀等錯誤
支持對數據進行修正和格式變更
為了能夠滿足上述要求,我們今年引入了Iceberg,通過Iceberg提供的ACID事務機制和增量更新能力提供更可靠和更強大的數據接入服務。
基于Flink實現端到端Exactly Once傳輸
Flink通過檢查點(Checkpoint)機制來進行任務狀態的備份和恢復。在任務發生故障時,任務可以從上次備份的狀態恢復,而不必從頭開始重新執行。通過檢查點機制,Flink可以保證在發生故障時,仍然可以實現Exactly Once的數據傳輸。
但在整個數據接入的鏈路中,除了Flink之外還包括了上游的中間件和下游的數據倉庫等多個組件。僅僅依靠Flink的檢查點機制只能夠保證在Flink作業內部的Exactly Once的數據傳輸,而并不能保證在整個數據接入鏈路中端到端的Exactly Once的傳輸語義。如果我們將Flink收到的數據直接寫到下游的存儲系統,那么當Flink發生故障并從故障中恢復時,從上次檢查點之后被寫到下游存儲系統中的數據將被重復,導致后續數據分析發生誤差。
而為了保證端到端的Exactly Once數據傳輸,TDBank利用了Flink的檢查點機制實現了一個兩階段提交的協議,并會對數據接入各個環節產生的指標進行聚合和對賬,確保端到端數據傳輸的可靠性。
為了保證數據鏈路的Exactly Once,我們將Flink收到的數據會先寫入到一個臨時目錄中,并將寫出的文件列表保存起來。執行checkpoint的時候,我們會將這些文件列表保存到checkpoint中并記錄下來。而當checkpoint完成時,Flink會通知所有的節點。此時這些節點就會將checkpoint中保存的文件移動到正式目錄中。
在這種實現方式中,Flink利用已有的checkpoint機制實現了一個兩階段提交的機制。所有節點在執行checkpoint時執行了預提交的操作,將所有數據都先寫入到一個可靠的分布式存儲中。當checkpoint在JobManager上完成時,即認為這個事務被提交了。所有節點在收到checkpoint成功的消息后會完成最后的事務提交操作。
如果有節點在執行最后文件移動的時候出現故障,那么Flink作業將從上次完成的checkpoint中恢復,并從上次完成的checkpoint中獲得完整的文件列表。Flink作業會檢查這個文件列表中的文件,并將所有還未移動的文件移動到最終的目錄中。
而為了確保數據在整個接入過程在不會發生數據丟失和重復,我們會對整個數據鏈路中的每個組件發送和接收到的數據數目進行了采集和對賬。由于一般的指標系統并不能保證指標的時效性和正確性,因此我們也基于Flink實現了高可靠和強一致性的指標聚合。
類似于數據鏈路,我們也采用Flink的checkpoint機制來保證指標數據的一致性。我們通過Flink將采集到的指標按照分鐘粒度進行聚合,并在執行checkpoint時將這些聚合指標保存到外部存儲中。在保存聚合指標時,除了一般的標簽之外,我們還會帶上寫出這些指標時的checkpoint編號。而當checkpoint完成時,每個節點還會將完成的checkpoint編號也記錄到外部存儲中。當我們需要查詢指標時,我們只需要將已完成的checkpoint編號和聚合指標進行連接就可以獲得一致性的指標結果。
通過Flink的checkpoint機制,我們可以保證數據鏈路和指標鏈路中數據傳輸和指標聚合的一致性,確保在整個數據接入鏈路實現端到端的Exactly Once數據傳輸。
基于Iceberg實現ACID的實時數據接入
Apache Iceberg是一個通用的表格式(數據組織格式),它可以適配Presto,Spark等引擎提供高性能的讀寫和元數據管理功能。Iceberg的定位是在計算引擎之下存儲之上。它是一種數據存儲格式,Iceberg稱其為"table format"。準確的說,它是介于計算引擎和數據存儲格式之間的數據組織格式?- 通過特定的方式將數據和元數據組織起來,因此稱之為數據組織格式更為合理。
Iceberg通過鎖機制實現了ACID的能力。在每次元數據更新時它會從metastore中獲取鎖并進行更新。同時Iceberg保證了線性一致性(Serializable isolation),確保表的修改操作是原子性的,讀操作永遠不會讀到部分或是沒有commit的數據。Iceberg提供了樂觀鎖的機制降低鎖的影響,并且使用沖突回退和重試機制來解決并發寫所造成的沖突問題。
基于ACID的能力,Iceberg提供了類似于MVCC的讀寫分離能力。首先,每次寫操作都會產生一個新的快照(snapshot),快照始終是往后線性遞增,確保了線性一致性。而讀操作只會讀取已經存在了的快照,對于正在生成的快照讀操作是不可見的。每一個快照擁有表在那一時刻所有的數據和元數據,因此提供了用戶回溯(time travel)表數據的能力。利用Iceberg的time travel能力,用戶可以讀取那一時刻的數據,同時也提供了用戶快照回滾和數據重放的能力。
相比于Hudi,Delta Lake,Iceberg提供了更為完整的表格式的能力、類型的定義和操作的抽象,并與上層數據處理引擎和底層數據存儲格式的解耦。此外,Iceberg在設計之初并沒有綁定某種特定的存儲引擎,同時避免了與上層引擎之間的相互調用,使得Iceberg可以非常容易地擴展到對于不同引擎的支持。
而在數據接入中,通過Iceberg可以保證ACID事務和強一致性,實現“有且僅有一次”的寫入;讀寫分離使交互式查詢引擎(如Hive和Presto等)可以第一時間讀到正確的數據;Row-level update和delete支持通過計算引擎進行數據修正;增量消費使得已落地的數據可以進一步的返回流式處理引擎,并只處理和向后傳遞變化的部分;Iceberg高效的查詢能力也能省去導入MySQL或ClickHouse等環節,直接被報表和BI系統消費。
為了能夠使用Iceberg,騰訊大數據實現了支持Iceberg的Flink連接器,允許Flink將數據寫入到Iceberg中。Flink的Iceberg Sink由兩部分組成,一個稱為Writer,而另一個是Committer。Writer負責將收到的數據寫到外部的存儲中,形成一系列的DataFile。目前為了簡化適配并最大限度利用已有邏輯,騰訊內部使用Avro作為數據的中間格式。后續社區將引入一個Flink內建類型的轉換器,使用Iceberg內建的數據類型作為輸入。當Writer執行checkpoint時,Writer會關閉自己的文件,將構建的DataFile發送給下游的Committer。
Committer在一個Flink作業中是全局唯一的。在收到上游所有Writer發送的DataFile后,Committer會將這些DataFile寫到一個ManifestFile中,并將ManifestFile保存到checkpoint中。當checkpoint完成之后,Committer會將ManifestFile通過merge append提交給Iceberg。Iceberg內部會通過一系列操作完成commit操作,最終讓新加入的數據對下游的數據倉庫可見。
騰訊對Iceberg進行了大量的改進和優化。除了支持了Flink的讀寫之外,騰訊還完成了行級的刪除和更新操作,極大的節約了數據批改和刪除所帶來的開銷。同時,騰訊還對Spark 3.0中的Data Source V2進行了適配,利用Spark 3.0中的SQL和DataFrame可以無縫的對接Iceberg。
而在后面的工作中,騰訊會繼續增強Iceberg的核心能力,主要包括:
為Flink sink增加update和delete的語義,使延遲到達的數據可以得到正確的處理,以支持CDC的場景;
增加對Hive的支持;
增加以Merge-On-Read方式進行row-level update和delete操作等。
后臺回復關鍵詞【GIAC】可獲取嘉賓分享PPT。
總結
以上是生活随笔為你收集整理的基于Flink的高可靠实时ETL系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信「看一看」 朋友在看的增强推荐系统
- 下一篇: 腾讯看点投放系统介绍:推荐系统的进化伙伴