网易数据湖探索与实践-范欣欣
分享嘉賓:范欣欣?網(wǎng)易?大數(shù)據(jù)技術(shù)專家
編輯整理:劉閏豐
出品平臺(tái):DataFunTalk
導(dǎo)讀:今天主要和大家交流的是網(wǎng)易在數(shù)據(jù)湖Iceberg的一些思考與實(shí)踐。從網(wǎng)易在數(shù)據(jù)倉(cāng)庫(kù)建設(shè)中遇到的痛點(diǎn)出發(fā),介紹對(duì)數(shù)據(jù)湖Iceberg的探索以及實(shí)踐之路。
主要內(nèi)容包括:
數(shù)據(jù)倉(cāng)庫(kù)平臺(tái)建設(shè)的痛點(diǎn)
數(shù)據(jù)湖Iceberg的核心原理
數(shù)據(jù)湖Iceberg社區(qū)現(xiàn)狀
網(wǎng)易數(shù)據(jù)湖Iceberg實(shí)踐之路
01
數(shù)據(jù)倉(cāng)庫(kù)平臺(tái)建設(shè)的痛點(diǎn)
痛點(diǎn)一:
我們凌晨一些大的離線任務(wù)經(jīng)常會(huì)因?yàn)橐恍┰虺霈F(xiàn)延遲,這種延遲會(huì)導(dǎo)致核心報(bào)表的產(chǎn)出時(shí)間不穩(wěn)定,有些時(shí)候會(huì)產(chǎn)出比較早,但是有時(shí)候就可能會(huì)產(chǎn)出比較晚,業(yè)務(wù)很難接受。
為什么會(huì)出現(xiàn)這種現(xiàn)象的發(fā)生呢?目前來(lái)看大致有這么幾點(diǎn)要素:
任務(wù)本身要請(qǐng)求的數(shù)據(jù)量會(huì)特別大。通常來(lái)說(shuō)一天原始的數(shù)據(jù)量可能在幾十TB。幾百個(gè)分區(qū),甚至上千個(gè)分區(qū),五萬(wàn)+的文件數(shù)這樣子。如果說(shuō)全量讀取這些文件的話,幾百個(gè)分區(qū)就會(huì)向NameNode發(fā)送幾百次請(qǐng)求,我們知道離線任務(wù)在凌晨運(yùn)行的時(shí)候,NameNode的壓力是非常大的。所以就很有可能出現(xiàn)Namenode響應(yīng)很慢的情況,如果請(qǐng)求響應(yīng)很慢就會(huì)導(dǎo)致任務(wù)初始化時(shí)間很長(zhǎng)。
任務(wù)本身的ETL效率是相對(duì)低效的,這個(gè)低效并不是說(shuō)Spark引擎低效,而是說(shuō)我們的存儲(chǔ)在這塊支持的不是特別的好。比如目前我們查一個(gè)分區(qū)的話是需要將所有文件都掃描一遍然后進(jìn)行分析,而實(shí)際上我可能只對(duì)某些文件感興趣。所以相對(duì)而言這個(gè)方案本身來(lái)說(shuō)就是相對(duì)低效的。
這種大的離線任務(wù)一旦遇到磁盤壞盤或者機(jī)器宕機(jī),就需要重試,重試一次需要耗費(fèi)很長(zhǎng)的時(shí)間比如幾十分鐘。如果說(shuō)重試一兩次的話這個(gè)延遲就會(huì)比較大了。
痛點(diǎn)二:
針對(duì)一些細(xì)瑣的一些問(wèn)題而言的。這里簡(jiǎn)單列舉了三個(gè)場(chǎng)景來(lái)分析:
不可靠的更新操作。我們經(jīng)常在ETL過(guò)程中執(zhí)行一些insert overwrite之類的操作,這類操作會(huì)先把相應(yīng)分區(qū)的數(shù)據(jù)刪除,再把生成的文件加載到分區(qū)中去。在我們移除文件的時(shí)候,很多正在讀取這些文件的任務(wù)就會(huì)發(fā)生異常,這就是不可靠的更新操作。
表Schema變更低效。目前我們?cè)趯?duì)表做一些加字段、更改分區(qū)的操作其實(shí)是非常低效的操作,我們需要把所有的原始數(shù)據(jù)讀出來(lái),然后在重新寫(xiě)回去。這樣就會(huì)非常耗時(shí),并且低效。
數(shù)據(jù)可靠性缺乏保障。主要是我們對(duì)于分區(qū)的操作,我們會(huì)把分區(qū)的信息分為兩個(gè)地方,HDFS和Metastore,分別存儲(chǔ)一份。在這種情況下,如果進(jìn)行更新操作,就可能會(huì)出現(xiàn)一個(gè)更新成功而另一個(gè)更新失敗,會(huì)導(dǎo)致數(shù)據(jù)不可靠。
痛點(diǎn)三:
基于Lambda架構(gòu)建設(shè)的實(shí)時(shí)數(shù)倉(cāng)存在較多的問(wèn)題。如上圖的這個(gè)架構(gòu)圖,第一條鏈路是基于kafka中轉(zhuǎn)的一條實(shí)時(shí)鏈路(延遲要求小于5分鐘),另一條是離線鏈路(延遲大于1小時(shí)),甚至有些公司會(huì)有第三條準(zhǔn)實(shí)時(shí)鏈路(延遲要求5分鐘~一小時(shí)),甚至更復(fù)雜的場(chǎng)景。
兩條鏈路對(duì)應(yīng)兩份數(shù)據(jù),很多時(shí)候?qū)崟r(shí)鏈路的處理結(jié)果和離線鏈路的處理結(jié)果對(duì)不上。
Kafka無(wú)法存儲(chǔ)海量數(shù)據(jù), 無(wú)法基于當(dāng)前的OLAP分析引擎高效查詢Kafka中的數(shù)據(jù)。
Lambda維護(hù)成本高。代碼、數(shù)據(jù)血緣、Schema等都需要兩套。運(yùn)維、監(jiān)控等成本都非常高。
痛點(diǎn)四:
不能友好地支持高效更新場(chǎng)景。大數(shù)據(jù)的更新場(chǎng)景一般有兩種,一種是CDC ( Change Data Capture ) 的更新,尤其在電商的場(chǎng)景下,將binlog中的更新刪除同步到HDFS上。另一種是延遲數(shù)據(jù)帶來(lái)的聚合后結(jié)果的更新。目前HDFS只支持追加寫(xiě),不支持更新。因此業(yè)界很多公司引入了Kudu。但是Kudu本身是有一些局限的,比如計(jì)算存儲(chǔ)沒(méi)有做到分離。這樣整個(gè)數(shù)倉(cāng)系統(tǒng)中引入了HDFS、Kafka以及Kudu,運(yùn)維成本不可謂不大。
上面就是針對(duì)目前數(shù)倉(cāng)所涉及到的四個(gè)痛點(diǎn)的大致介紹,因此我們也是通過(guò)對(duì)數(shù)據(jù)湖的調(diào)研和實(shí)踐,希望能在這四個(gè)方面對(duì)數(shù)倉(cāng)建設(shè)有所幫助。接下來(lái)重點(diǎn)講解下對(duì)數(shù)據(jù)湖的一些思考。
02
數(shù)據(jù)湖Iceberg核心原理
1. 數(shù)據(jù)湖開(kāi)源產(chǎn)品調(diào)研
數(shù)據(jù)湖大致是從19年開(kāi)始慢慢火起來(lái)的,目前市面上核心的數(shù)據(jù)湖開(kāi)源產(chǎn)品大致有這么幾個(gè):
DELTA LAKE,在17年的時(shí)候DataBricks就做了DELTA LAKE的商業(yè)版。主要想解決的也是基于Lambda架構(gòu)帶來(lái)的存儲(chǔ)問(wèn)題,它的初衷是希望通過(guò)一種存儲(chǔ)來(lái)把Lambda架構(gòu)做成kappa架構(gòu)。
Hudi ( Uber開(kāi)源 ) 可以支持快速的更新以及增量的拉取操作。這是它最大的賣點(diǎn)之一。
Iceberg的初衷是想做標(biāo)準(zhǔn)的Table Format以及高效的ETL。
上圖是來(lái)自阿里Flink團(tuán)體針對(duì)數(shù)據(jù)湖方案的一些調(diào)研對(duì)比,總體來(lái)看這些方案的基礎(chǔ)功能相對(duì)都還是比較完善的。我說(shuō)的基礎(chǔ)功能主要包括:
高效Table Schema的變更,比如針對(duì)增減分區(qū),增減字段等功能
ACID語(yǔ)義保證
同時(shí)支持流批讀寫(xiě),不會(huì)出現(xiàn)臟讀等現(xiàn)象
支持OSS這類廉價(jià)存儲(chǔ)
2. 當(dāng)然還有一些不同點(diǎn):
Hudi的特性主要是支持快速的更新刪除和增量拉取。
Iceberg的特性主要是代碼抽象程度高,不綁定任何的Engine。它暴露出來(lái)了非常核心的表層面的接口,可以非常方便的與Spark/Flink對(duì)接。然而Delta和Hudi基本上和spark的耦合很重。如果想接入flink,相對(duì)比較難。
3. 我們選擇Iceberg的原因:
現(xiàn)在國(guó)內(nèi)的實(shí)時(shí)數(shù)倉(cāng)建設(shè)圍繞flink的情況會(huì)多一點(diǎn)。所以能夠基于flink擴(kuò)展生態(tài),是我們選擇iceberg一個(gè)比較重要的點(diǎn)。
國(guó)內(nèi)也有很多基于Iceberg開(kāi)發(fā)的重要力量,比如騰訊團(tuán)隊(duì)、阿里Flink官方團(tuán)隊(duì),他們的數(shù)據(jù)湖選型也是Iceberg。目前他們?cè)谏鐓^(qū)分別主導(dǎo)update以及flink的生態(tài)對(duì)接。
4. 接下來(lái)我們重點(diǎn)介紹一下Iceberg:
這是來(lái)自官方對(duì)于Iceberg的一段介紹,大致就是Iceberg是一個(gè)開(kāi)源的基于表格式的數(shù)據(jù)湖。關(guān)于table format再給大家詳細(xì)介紹下:
左側(cè)圖是一個(gè)抽象的數(shù)據(jù)處理系統(tǒng),分別由SQL引擎、table format、文件集合以及分布式文件系統(tǒng)構(gòu)成。右側(cè)是對(duì)應(yīng)的現(xiàn)實(shí)中的組件,SQL引擎比如HiveServer、Impala、Spark等等,table format比如Metastore或者Iceberg,文件集合主要有Parquet文件等,而分布式文件系統(tǒng)就是HDFS。
對(duì)于table format,我認(rèn)為主要包含4個(gè)層面的含義,分別是表schema定義(是否支持復(fù)雜數(shù)據(jù)類型),表中文件的組織形式,表相關(guān)統(tǒng)計(jì)信息、表索引信息以及表的讀寫(xiě)API實(shí)現(xiàn)。詳述如下:
表schema定義了一個(gè)表支持字段類型,比如int、string、long以及復(fù)雜數(shù)據(jù)類型等。
表中文件組織形式最典型的是Partition模式,是Range Partition還是Hash Partition。
Metadata數(shù)據(jù)統(tǒng)計(jì)信息。
封裝了表的讀寫(xiě)API。上層引擎通過(guò)對(duì)應(yīng)的API讀取或者寫(xiě)入表中的數(shù)據(jù)。
和Iceberg差不多相當(dāng)?shù)囊粋€(gè)組件是Metastore。不過(guò)Metastore是一個(gè)服務(wù),而Iceberg就是一個(gè)jar包。這里就Metastore 和 Iceberg在表格式的4個(gè)方面分別進(jìn)行一下對(duì)比介紹:
① 在schema層面上沒(méi)有任何區(qū)別:
都支持int、string、bigint等類型。
② partition實(shí)現(xiàn)完全不同:
兩者在partition上有很大的不同:
metastore中partition字段不能是表字段,因?yàn)閜artition字段本質(zhì)上是一個(gè)目錄結(jié)構(gòu),不是用戶表中的一列數(shù)據(jù)。基于metastore,用戶想定位到一個(gè)partition下的所有數(shù)據(jù),首先需要在metastore中定位出該partition對(duì)應(yīng)的所在目錄位置信息,然后再到HDFS上執(zhí)行l(wèi)ist命令獲取到這個(gè)分區(qū)下的所有文件,對(duì)這些文件進(jìn)行掃描得到這個(gè)partition下的所有數(shù)據(jù)。
iceberg中partition字段就是表中的一個(gè)字段。Iceberg中每一張表都有一個(gè)對(duì)應(yīng)的文件元數(shù)據(jù)表,文件元數(shù)據(jù)表中每條記錄表示一個(gè)文件的相關(guān)信息,這些信息中有一個(gè)字段是partition字段,表示這個(gè)文件所在的partition。
很明顯,iceberg表根據(jù)partition定位文件相比metastore少了一個(gè)步驟,就是根據(jù)目錄信息去HDFS上執(zhí)行l(wèi)ist命令獲取分區(qū)下的文件。
試想,對(duì)于一個(gè)二級(jí)分區(qū)的大表來(lái)說(shuō),一級(jí)分區(qū)是小時(shí)時(shí)間分區(qū),二級(jí)分區(qū)是一個(gè)枚舉字段分區(qū),假如每個(gè)一級(jí)分區(qū)下有30個(gè)二級(jí)分區(qū),那么這個(gè)表每天就會(huì)有24 * 30 = 720個(gè)分區(qū)。基于Metastore的partition方案,如果一個(gè)SQL想基于這個(gè)表掃描昨天一天的數(shù)據(jù)的話,就需要向Namenode下發(fā)720次list請(qǐng)求,如果掃描一周數(shù)據(jù)或者一個(gè)月數(shù)據(jù),請(qǐng)求數(shù)就更是相當(dāng)夸張。這樣,一方面會(huì)導(dǎo)致Namenode壓力很大,一方面也會(huì)導(dǎo)致SQL請(qǐng)求響應(yīng)延遲很大。而基于Iceberg的partition方案,就完全沒(méi)有這個(gè)問(wèn)題。
③ 表統(tǒng)計(jì)信息實(shí)現(xiàn)粒度不同:
Metastore中一張表的統(tǒng)計(jì)信息是表/分區(qū)級(jí)別粒度的統(tǒng)計(jì)信息,比如記錄一張表中某一列的記錄數(shù)量、平均長(zhǎng)度、為null的記錄數(shù)量、最大值\最小值等。
Iceberg中統(tǒng)計(jì)信息精確到文件粒度,即每個(gè)數(shù)據(jù)文件都會(huì)記錄所有列的記錄數(shù)量、平均長(zhǎng)度、最大值\最小值等。
很明顯,文件粒度的統(tǒng)計(jì)信息對(duì)于查詢中謂詞(即where條件)的過(guò)濾會(huì)更有效果。
④ 讀寫(xiě)API實(shí)現(xiàn)不同:
metastore模式下上層引擎寫(xiě)好一批文件,調(diào)用metastore的add partition接口將這些文件添加到某個(gè)分區(qū)下。
Iceberg模式下上層業(yè)務(wù)寫(xiě)好一批文件,調(diào)用iceberg的commit接口提交本次寫(xiě)入形成一個(gè)新的snapshot快照。這種提交方式保證了表的ACID語(yǔ)義。同時(shí)基于snapshot快照提交可以實(shí)現(xiàn)增量拉取實(shí)現(xiàn)。
總結(jié)下Iceberg相對(duì)于Metastore的優(yōu)勢(shì):
新partition模式:避免了查詢時(shí)n次調(diào)用namenode的list方法,降低namenode壓力,提升查詢性能
新metadata模式:文件級(jí)別列統(tǒng)計(jì)信息可以用來(lái)根據(jù)where字段進(jìn)行文件過(guò)濾,很多場(chǎng)景下可以大大減少掃描文件數(shù),提升查詢性能
新API模式:存儲(chǔ)批流一體
1. 流式寫(xiě)入-增量拉取(基于Iceberg統(tǒng)一存儲(chǔ)模式可以同時(shí)滿足業(yè)務(wù)批量讀取以及增量訂閱需求)
2. 支持批流同時(shí)讀寫(xiě)同一張表,統(tǒng)一表schema,任務(wù)執(zhí)行過(guò)程中不會(huì)出現(xiàn)FileNotFoundException
Iceberg的提升體現(xiàn)在:
03
數(shù)據(jù)湖Iceberg社區(qū)現(xiàn)狀
目前Iceberg主要支持的計(jì)算引擎包括Spark2.4.5、Spark 3.x以及Presto。同時(shí),一些運(yùn)維工作比如snapshot過(guò)期、小文件合并、增量訂閱消費(fèi)等功能都可以實(shí)現(xiàn)。
在此基礎(chǔ)上,目前社區(qū)正在開(kāi)發(fā)的功能主要有Hive集成、Flink集成以及支持Update/Delete功能。相信下一個(gè)版本就可以看到Hive/Flink集成的相關(guān)功能。
04
網(wǎng)易數(shù)據(jù)湖Iceberg實(shí)踐之路
Iceberg針對(duì)目前的大數(shù)量的情況下,可以大大提升ETL任務(wù)執(zhí)行的效率,這主要得益于新Partition模式下不再需要請(qǐng)求NameNode分區(qū)信息,同時(shí)得益于文件級(jí)別統(tǒng)計(jì)信息模式下可以過(guò)濾很多不滿足條件的數(shù)據(jù)文件。
當(dāng)前iceberg社區(qū)僅支持Spark2.4.5,我們?cè)谶@個(gè)基礎(chǔ)上做了更多計(jì)算引擎的適配工作。主要包括如下:
集成Hive。可以通過(guò)Hive創(chuàng)建和刪除iceberg表,通過(guò)HiveSQL查詢Iceberg表中的數(shù)據(jù)。
集成Impala。用戶可以通過(guò)Impala新建iceberg內(nèi)表\外表,并通過(guò)Impala查詢Iceberg表中的數(shù)據(jù)。目前該功能已經(jīng)貢獻(xiàn)給Impala社區(qū)。
集成Flink。已經(jīng)實(shí)現(xiàn)了Flink到Iceberg的sink實(shí)現(xiàn),業(yè)務(wù)可以消費(fèi)kafka中的數(shù)據(jù)將結(jié)果寫(xiě)入到Iceberg中。同時(shí)我們基于Flink引擎實(shí)現(xiàn)了小文件異步合并的功能,這樣可以實(shí)現(xiàn)Flink一邊寫(xiě)數(shù)據(jù)文件,一邊執(zhí)行小文件的合并。基于Iceberg的小文件合并通過(guò)commit的方式提交,不需要?jiǎng)h除合并前的小文件,也就不會(huì)引起讀取任務(wù)的任何異常。
總結(jié)
以上是生活随笔為你收集整理的网易数据湖探索与实践-范欣欣的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 看看京东应急供应链是怎样构建的?
- 下一篇: 加速研发自动驾驶卡车,戴姆勒买下Torc