Delta Lake在Soul的应用实践
一、背景介紹
(一)業(yè)務(wù)場(chǎng)景
傳統(tǒng)離線數(shù)倉(cāng)模式下,日志入庫(kù)前首要階段便是ETL,Soul的埋點(diǎn)日志數(shù)據(jù)量龐大且需動(dòng)態(tài)分區(qū)入庫(kù),在按day分區(qū)的基礎(chǔ)上,每天的動(dòng)態(tài)分區(qū)1200+,分區(qū)數(shù)據(jù)量大小不均,數(shù)萬(wàn)條到數(shù)十億條不等。下圖為我們之前的ETL過(guò)程,埋點(diǎn)日志輸入Kafka,由Flume采集到HDFS,再經(jīng)由天級(jí)Spark ETL任務(wù),落表入Hive。任務(wù)凌晨開(kāi)始運(yùn)行,數(shù)據(jù)處理階段約1h,Load階段1h+,整體執(zhí)行時(shí)間為2-3h。
?
?
(二)存在的問(wèn)題
在上面的架構(gòu)下,我們面臨如下問(wèn)題:
1.天級(jí)ETL任務(wù)耗時(shí)久,影響下游依賴的產(chǎn)出時(shí)間。
2.凌晨占用資源龐大,任務(wù)高峰期搶占大量集群資源。
3.ETL任務(wù)穩(wěn)定性不佳且出錯(cuò)需凌晨解決、影響范圍大。
二、為什么選擇Delta?
為了解決天級(jí)ETL逐漸尖銳的問(wèn)題,減少資源成本、提前數(shù)據(jù)產(chǎn)出,我們決定將T+1級(jí)ETL任務(wù)轉(zhuǎn)換成T+0實(shí)時(shí)日志入庫(kù),在保證數(shù)據(jù)一致的前提下,做到數(shù)據(jù)落地即可用。
之前我們也實(shí)現(xiàn)了Lambda架構(gòu)下離線、實(shí)時(shí)分別維護(hù)一份數(shù)據(jù),但在實(shí)際使用中仍存在一些棘手問(wèn)題,比如:無(wú)法保證事務(wù)性,小文件過(guò)多帶來(lái)的集群壓力及查詢性能等問(wèn)題,最終沒(méi)能達(dá)到理想化使用。
所以這次我們選擇了近來(lái)逐漸進(jìn)入大家視野的數(shù)據(jù)湖架構(gòu),數(shù)據(jù)湖的概念在此我就不過(guò)多贅述了,我理解它就是一種將元數(shù)據(jù)視為大數(shù)據(jù)的Table Format。目前主流的數(shù)據(jù)湖分別有Delta Lake(分為開(kāi)源版和商業(yè)版)、Hudi、Iceberg,三者都支持了ACID語(yǔ)義、Upsert、Schema動(dòng)態(tài)變更、Time Travel等功能,其他方面我們做些簡(jiǎn)單的總結(jié)對(duì)比:
開(kāi)源版Delta
優(yōu)勢(shì):
1.支持作為source流式讀
2.Spark3.0支持sql操作
劣勢(shì):
1.引擎強(qiáng)綁定Spark
2.手動(dòng)Compaction
3.Join式Merge,成本高
Hudi
優(yōu)勢(shì):
1.基于主鍵的快速Upsert/Delete
2.Copy on Write / Merge on Read 兩種merge方式,分別適配讀寫(xiě)場(chǎng)景優(yōu)化
3.自動(dòng)Compaction
劣勢(shì):
1.寫(xiě)入綁定Spark/DeltaStreamer
2.API較為復(fù)雜
Iceberg
優(yōu)勢(shì):
1.可插拔引擎
劣勢(shì):
1.調(diào)研時(shí)還在發(fā)展階段,部分功能尚未完善
2.Join式Merge,成本高
調(diào)研時(shí)期,阿里云的同學(xué)提供了EMR版本的Delta,在開(kāi)源版本的基礎(chǔ)上進(jìn)行了功能和性能上的優(yōu)化,諸如:SparkSQL/Spark Streaming SQL的集成,自動(dòng)同步Delta元數(shù)據(jù)信息到HiveMetaStore(MetaSync功能),自動(dòng)Compaction,適配Tez、Hive、Presto等更多查詢引擎,優(yōu)化查詢性能(Zorder/DataSkipping/Merge性能)等等
三、實(shí)踐過(guò)程
測(cè)試階段,我們反饋了多個(gè)EMR Delta的bug,比如:Delta表無(wú)法自動(dòng)創(chuàng)建Hive映射表,Tez引擎無(wú)法正常讀取Delta類型的Hive表,Presto和Tez讀取Delta表數(shù)據(jù)不一致,均得到了阿里云同學(xué)的快速支持并一一解決。
引入Delta后,我們實(shí)時(shí)日志入庫(kù)架構(gòu)如下所示:
?
?
數(shù)據(jù)由各端埋點(diǎn)上報(bào)至Kafka,通過(guò)Spark任務(wù)分鐘級(jí)以Delta的形式寫(xiě)入HDFS,然后在Hive中自動(dòng)化創(chuàng)建Delta表的映射表,即可通過(guò)Hive MR、Tez、Presto等查詢引擎直接進(jìn)行數(shù)據(jù)查詢及分析。
我們基于Spark,封裝了通用化ETL工具,實(shí)現(xiàn)了配置化接入,用戶無(wú)需寫(xiě)代碼即可實(shí)現(xiàn)源數(shù)據(jù)到Hive的整體流程接入。并且,為了更加適配業(yè)務(wù)場(chǎng)景,我們?cè)诜庋b層實(shí)現(xiàn)了多種實(shí)用功能:
1. 實(shí)現(xiàn)了類似Iceberg的hidden partition功能,用戶可選擇某些列做適當(dāng)變化形成一個(gè)新的列,此列可作為分區(qū)列,也可作為新增列,使用SparkSql操作。如:有日期列date,那么可以通過(guò) 'substr(date,1,4) as year' 生成新列,并可以作為分區(qū)。
2. 為避免臟數(shù)據(jù)導(dǎo)致分區(qū)出錯(cuò),實(shí)現(xiàn)了對(duì)動(dòng)態(tài)分區(qū)的正則檢測(cè)功能,比如:Hive中不支持中文分區(qū),用戶可以對(duì)動(dòng)態(tài)分區(qū)加上'\w+'的正則檢測(cè),分區(qū)字段不符合的臟數(shù)據(jù)則會(huì)被過(guò)濾。
3. 實(shí)現(xiàn)自定義事件時(shí)間字段功能,用戶可選數(shù)據(jù)中的任意時(shí)間字段作為事件時(shí)間落入對(duì)應(yīng)分區(qū),避免數(shù)據(jù)漂移問(wèn)題。
4. 嵌套Json自定義層數(shù)解析,我們的日志數(shù)據(jù)大都為Json格式,其中難免有很多嵌套Json,此功能支持用戶選擇對(duì)嵌套Json的解析層數(shù),嵌套字段也會(huì)被以單列的形式落入表中。
5. 實(shí)現(xiàn)SQL化自定義配置動(dòng)態(tài)分區(qū)的功能,解決埋點(diǎn)數(shù)據(jù)傾斜導(dǎo)致的實(shí)時(shí)任務(wù)性能問(wèn)題,優(yōu)化資源使用,此場(chǎng)景后面會(huì)詳細(xì)介紹。
平臺(tái)化建設(shè):我們已經(jīng)把日志接入Hive的整體流程嵌入了Soul的數(shù)據(jù)平臺(tái)中,用戶可通過(guò)此平臺(tái)申請(qǐng)日志接入,由審批人員審批后進(jìn)行相應(yīng)參數(shù)配置,即可將日志實(shí)時(shí)接入Hive表中,簡(jiǎn)單易用,降低操作成本。
?
?
為了解決小文件過(guò)多的問(wèn)題,EMR Delta實(shí)現(xiàn)了Optimize/Vacuum語(yǔ)法,可以定期對(duì)Delta表執(zhí)行Optimize語(yǔ)法進(jìn)行小文件的合并,執(zhí)行Vacuum語(yǔ)法對(duì)過(guò)期文件進(jìn)行清理,使HDFS上的文件保持合適的大小及數(shù)量。值得一提的是,EMR Delta目前也實(shí)現(xiàn)了一些auto-compaction的策略,可以通過(guò)配置來(lái)自動(dòng)觸發(fā)compaction,比如:小文件數(shù)量達(dá)到一定值時(shí),在流式作業(yè)階段啟動(dòng)minor compaction任務(wù),在對(duì)實(shí)時(shí)任務(wù)影響較小的情況下,達(dá)到合并小文件的目的。
四、問(wèn)題 & 方案
接下來(lái)介紹一下我們?cè)诼涞谼elta的過(guò)程中遇到過(guò)的問(wèn)題
(一)埋點(diǎn)數(shù)據(jù)動(dòng)態(tài)分區(qū)數(shù)據(jù)量分布不均導(dǎo)致的數(shù)據(jù)傾斜問(wèn)題
Soul的埋點(diǎn)數(shù)據(jù)是落入分區(qū)寬表中的,按埋點(diǎn)類型分區(qū),不同類型的埋點(diǎn)數(shù)據(jù)量分布不均,例如:通過(guò)Spark寫(xiě)入Delta的過(guò)程中,5min為一個(gè)Batch,大部分類型的埋點(diǎn),5min的數(shù)據(jù)量很小(10M以下),但少量埋點(diǎn)數(shù)據(jù)量卻在5min能達(dá)到1G或更多。數(shù)據(jù)落地時(shí),我們假設(shè)DataFrame有M個(gè)partition,表有N個(gè)動(dòng)態(tài)分區(qū),每個(gè)partition中的數(shù)據(jù)都是均勻且混亂的,那么每個(gè)partition中都會(huì)生成N個(gè)文件分別對(duì)應(yīng)N個(gè)動(dòng)態(tài)分區(qū),那么每個(gè)Batch就會(huì)生成M*N個(gè)小文件。
?
?
為了解決上述問(wèn)題,數(shù)據(jù)落地前對(duì)DataFrame按動(dòng)態(tài)分區(qū)字段repartition,這樣就能保證每個(gè)partition中分別有不同分區(qū)的數(shù)據(jù),這樣每個(gè)Batch就只會(huì)生成N個(gè)文件,即每個(gè)動(dòng)態(tài)分區(qū)一個(gè)文件,這樣解決了小文件膨脹的問(wèn)題。但與此同時(shí),有幾個(gè)數(shù)據(jù)量過(guò)大的分區(qū)的數(shù)據(jù)也會(huì)只分布在一個(gè)partition中,就導(dǎo)致了某幾個(gè)partition數(shù)據(jù)傾斜,且這些分區(qū)每個(gè)Batch產(chǎn)生的文件過(guò)大等問(wèn)題。
解決方案:如下圖,我們實(shí)現(xiàn)了用戶通過(guò)SQL自定義配置repartition列的功能,簡(jiǎn)單來(lái)說(shuō),用戶可以使用SQL,把數(shù)據(jù)量過(guò)大的幾個(gè)埋點(diǎn),通過(guò)加鹽方式打散到多個(gè)partition,對(duì)于數(shù)據(jù)量正常的埋點(diǎn)則無(wú)需操作。通過(guò)此方案,我們把Spark任務(wù)中每個(gè)Batch執(zhí)行最慢的partition的執(zhí)行時(shí)間從3min提升到了40s,解決了文件過(guò)小或過(guò)大的問(wèn)題,以及數(shù)據(jù)傾斜導(dǎo)致的性能問(wèn)題。
?
?
(二)應(yīng)用層基于元數(shù)據(jù)的動(dòng)態(tài)schema變更
數(shù)據(jù)湖支持了動(dòng)態(tài)schema變更,但在Spark寫(xiě)入之前,構(gòu)造DataFrame時(shí),是需要獲取數(shù)據(jù)schema的,如果此時(shí)無(wú)法動(dòng)態(tài)變更,那么便無(wú)法把新字段寫(xiě)入Delta表,Delta的動(dòng)態(tài)schena便也成了擺設(shè)。埋點(diǎn)數(shù)據(jù)由于類型不同,每條埋點(diǎn)數(shù)據(jù)的字段并不完全相同,那么在落表時(shí),必須取所有數(shù)據(jù)的字段并集,作為Delta表的schema,這就需要我們?cè)跇?gòu)建DataFrame時(shí)便能感知是否有新增字段。
解決方案:我們額外設(shè)計(jì)了一套元數(shù)據(jù),在Spark構(gòu)建DataFrame時(shí),首先根據(jù)此元數(shù)據(jù)判斷是否有新增字段,如有,就把新增字段更新至元數(shù)據(jù),以此元數(shù)據(jù)為schema構(gòu)建DataFrame,就能保證我們?cè)趹?yīng)用層動(dòng)態(tài)感知schema變更,配合Delta的動(dòng)態(tài)schema變更,新字段自動(dòng)寫(xiě)入Delta表,并把變化同步到對(duì)應(yīng)的Hive表中。
(三)Spark Kafka偏移量提交機(jī)制導(dǎo)致的數(shù)據(jù)重復(fù)
我們?cè)谑褂肧park Streaming時(shí),會(huì)在數(shù)據(jù)處理完成后將消費(fèi)者偏移量提交至Kafka,調(diào)用的是
spark-streaming-kafka-0-10中的commitAsync API。我一直處于一個(gè)誤區(qū),以為數(shù)據(jù)在處理完成后便會(huì)提交當(dāng)前Batch消費(fèi)偏移量。但后來(lái)遇到Delta表有數(shù)據(jù)重復(fù)現(xiàn)象,排查發(fā)現(xiàn)偏移量提交時(shí)機(jī)為下一個(gè)Batch開(kāi)始時(shí),并不是當(dāng)前Batch數(shù)據(jù)處理完成后就提交。那么問(wèn)題來(lái)了:假如一個(gè)批次5min,在3min時(shí)數(shù)據(jù)處理完成,此時(shí)成功將數(shù)據(jù)寫(xiě)入Delta表,但偏移量卻在5min后(第二個(gè)批次開(kāi)始時(shí))才成功提交,如果在3min-5min這個(gè)時(shí)間段中,重啟任務(wù),那么就會(huì)重復(fù)消費(fèi)當(dāng)前批次的數(shù)據(jù),造成數(shù)據(jù)重復(fù)。
解決方案:
1.StructStreaming支持了對(duì)Delta的exactly-once,可以使用StructStreaming適配解決。
2.可以通過(guò)其他方式維護(hù)消費(fèi)偏移量解決。
(四)查詢時(shí)解析元數(shù)據(jù)耗時(shí)較多
因?yàn)镈elta單獨(dú)維護(hù)了自己的元數(shù)據(jù),在使用外部查詢引擎查詢時(shí),需要先解析元數(shù)據(jù)以獲取數(shù)據(jù)文件信息。隨著Delta表的數(shù)據(jù)增長(zhǎng),元數(shù)據(jù)也逐漸增大,此操作耗時(shí)也逐漸變長(zhǎng)。
解決方案:阿里云同學(xué)也在不斷優(yōu)化查詢方案,通過(guò)緩存等方式盡量減少對(duì)元數(shù)據(jù)的解析成本。
(五)關(guān)于CDC場(chǎng)景
目前我們基于Delta實(shí)現(xiàn)的是日志的Append場(chǎng)景,還有另外一種經(jīng)典業(yè)務(wù)場(chǎng)景CDC場(chǎng)景。Delta本身是支持Update/Delete的,是可以應(yīng)用在CDC場(chǎng)景中的。但是基于我們的業(yè)務(wù)考量,暫時(shí)沒(méi)有將Delta使用在CDC場(chǎng)景下,原因是Delta表的Update/Delete方式是Join式的Merge方式,我們的業(yè)務(wù)表數(shù)據(jù)量比較大,更新頻繁,并且更新數(shù)據(jù)涉及的分區(qū)較廣泛,在Merge上可能存在性能問(wèn)題。
阿里云的同學(xué)也在持續(xù)在做Merge的性能優(yōu)化,比如Join的分區(qū)裁剪、Bloomfilter等,能有效減少Join時(shí)的文件數(shù)量,尤其對(duì)于分區(qū)集中的數(shù)據(jù)更新,性能更有大幅提升,后續(xù)我們也會(huì)嘗試將Delta應(yīng)用在CDC場(chǎng)景。
五、后續(xù)計(jì)劃
1.基于Delta Lake,進(jìn)一步打造優(yōu)化實(shí)時(shí)數(shù)倉(cāng)結(jié)構(gòu),提升部分業(yè)務(wù)指標(biāo)實(shí)時(shí)性,滿足更多更實(shí)時(shí)的業(yè)務(wù)需求。
2.打通我們內(nèi)部的元數(shù)據(jù)平臺(tái),實(shí)現(xiàn)日志接入->實(shí)時(shí)入庫(kù)->元數(shù)據(jù)+血緣關(guān)系一體化、規(guī)范化管理。
3.持續(xù)觀察優(yōu)化Delta表查詢計(jì)算性能,嘗試使用Delta的更多功能,比如Z-Ordering,提升在即席查詢及數(shù)據(jù)分析場(chǎng)景下的性能。
作者:張宏博,Soul大數(shù)據(jù)工程師
總結(jié)
以上是生活随笔為你收集整理的Delta Lake在Soul的应用实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Bigo 实时计算平台建设实践
- 下一篇: CRM、DMP、CDP的区别