日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kylin KV+cube方案分析

發(fā)布時(shí)間:2025/3/21 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kylin KV+cube方案分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

前言

??在使用Kylin的時(shí)候,最重要的一步就是創(chuàng)建cube的模型定義,即指定度量和維度以及一些附加信息,然后對cube進(jìn)行build,當(dāng)然我們也可以根據(jù)原始表中的某一個(gè)string字段(這個(gè)字段的格式必須是日期格式,表示日期的含義)設(shè)定分區(qū)字段,這樣一個(gè)cube就可以進(jìn)行多次build,每一次的build會(huì)生成一個(gè)segment,每一個(gè)segment對應(yīng)著一個(gè)時(shí)間區(qū)間的cube,這些segment的時(shí)間區(qū)間是連續(xù)并且不重合的,對于擁有多個(gè)segment的cube可以執(zhí)行merge,相當(dāng)于將一個(gè)時(shí)間區(qū)間內(nèi)部的segment合并成一個(gè)。下面從源碼開始分析cube的build和merge過程。本文基于Kylin-1.0-incubating版本,對于Kylin的介紹可以參見:http://blog.csdn.net/yu616568/article/details/48103415

入口介紹

??在kylin的web頁面上創(chuàng)建完成一個(gè)cube之后可以點(diǎn)擊action下拉框執(zhí)行build或者merge操作,這兩個(gè)操作都會(huì)調(diào)用cube的rebuild接口,調(diào)用的參數(shù)包括:1、cube名,用于唯一標(biāo)識一個(gè)cube,在當(dāng)前的kylin版本中cube名是全局唯一的,而不是每一個(gè)project下唯一的;2、本次構(gòu)建的startTime和endTime,這兩個(gè)時(shí)間區(qū)間標(biāo)識本次構(gòu)建的segment的數(shù)據(jù)源只選擇這個(gè)時(shí)間范圍內(nèi)的數(shù)據(jù);對于BUILD操作而言,startTime是不需要的,因?yàn)樗偸菚?huì)選擇最后一個(gè)segment的結(jié)束時(shí)間作為當(dāng)前segment的起始時(shí)間。3、buildType標(biāo)識著操作的類型,可以是”BUILD”、”MERGE”和”REFRESH”。?
??這些操作的統(tǒng)一入口就是JobService.submitJob函數(shù),該函數(shù)首先取出該cube所有關(guān)聯(lián)的構(gòu)建cube的job,并且判斷這些job是否有處于READY、RUNNING、ERROR狀態(tài),如果處于該狀態(tài)意味著這個(gè)job正在執(zhí)行或者可以之后被resume執(zhí)行,做這種限制的原因不得而知(可能是構(gòu)建的區(qū)間是基于時(shí)間吧,需要對一個(gè)cube并行的構(gòu)建多個(gè)segment(時(shí)間區(qū)間的數(shù)據(jù))的需求并不明顯)。所以如果希望build或者merge cube,必須將未完成的cube的操作執(zhí)行discard操作。然后根據(jù)操作類型執(zhí)行具體的操作:?
1. 如果是BUILD,如果這個(gè)cube中包含distinct count聚合方式的度量并且這個(gè)cube中已經(jīng)存在其他segment,則執(zhí)行appendAndMergeSegments函數(shù),否則執(zhí)行buildJob函數(shù)。?
2. 如果是MERGE操作則執(zhí)行mergeSegments函數(shù)。?
3. 如果是REFRESH,則同樣執(zhí)行buildJob函數(shù)。為這個(gè)時(shí)間區(qū)間的segment重新構(gòu)建。?
??buildJob函數(shù)構(gòu)建一個(gè)新的segment,mergeSegments函數(shù)合并一個(gè)時(shí)間區(qū)間內(nèi)的所有segments,appendAndMergeSegments函數(shù)則首先根據(jù)最后一個(gè)segment的時(shí)間區(qū)間的end值build一個(gè)新的segment然后再將所有的時(shí)間區(qū)間的segments進(jìn)行合并(為什么包含distinct count的聚合函數(shù)的cube的構(gòu)建一定要進(jìn)行合并呢?這應(yīng)該是有distinct-count使用的hyperloglog算法決定的,下次可以專門分析一下這個(gè)算法)。

BUILD操作

??Build操作是構(gòu)建一個(gè)cube指定時(shí)間區(qū)間的數(shù)據(jù),由于kylin基于預(yù)計(jì)算的方式提供數(shù)據(jù)查詢,構(gòu)建操作是指將原始數(shù)據(jù)(存儲(chǔ)在Hadoop中,通過Hive獲取)轉(zhuǎn)換成目標(biāo)數(shù)據(jù)(存儲(chǔ)在Hbase中)的過程。主要的步驟可以按照順序分為四個(gè)階段:1、根據(jù)用戶的cube信息計(jì)算出多個(gè)cuboid文件,2、根據(jù)cuboid文件生成htable,3、更新cube信息,4、回收臨時(shí)文件。每一個(gè)階段操作的輸入都需要依賴于上一步的輸出,所以這些操作全是順序執(zhí)行的。

1. 計(jì)算cuboid文件

??在kylin的CUBE模型中,每一個(gè)cube是由多個(gè)cuboid組成的,理論上有N個(gè)普通維度的cube可以是由2的N次方個(gè)cuboid組成的,那么我們可以計(jì)算出最底層的cuboid,也就是包含全部維度的cuboid(相當(dāng)于執(zhí)行一個(gè)group by全部維度列的查詢),然后在根據(jù)最底層的cuboid一層一層的向上計(jì)算,直到計(jì)算出最頂層的cuboid(相當(dāng)于執(zhí)行了一個(gè)不帶group by的查詢),其實(shí)這個(gè)階段kylin的執(zhí)行原理就是這個(gè)樣子的,不過它需要將這些抽象成mapreduce模型,提交mapreduce作業(yè)執(zhí)行。

1.1 生成原始數(shù)據(jù)(Create Intermediate Flat Hive Table)

??這一步的操作是根據(jù)cube的定義生成原始數(shù)據(jù),這里會(huì)新創(chuàng)建一個(gè)hive外部表,然后再根據(jù)cube中定義的星狀模型,查詢出維度(對于DERIVED類型的維度使用的是外鍵列)和度量的值插入到新創(chuàng)建的表中,這個(gè)表是一個(gè)外部表,表的數(shù)據(jù)文件(存儲(chǔ)在HDFS)作為下一個(gè)子任務(wù)的輸入,它首先根據(jù)維度中的列和度量中作為參數(shù)的列得到需要出現(xiàn)在該表中的列,然后執(zhí)行三步hive操作,這三步hive操作是通過hive -e的方式執(zhí)行的shell命令。?
??1. drop TABLE IF EXISTS xxx.?
??2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\177’ STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據(jù)當(dāng)前的cube名和segment的uuid生成的,location是當(dāng)前job的臨時(shí)文件,只有當(dāng)insert插入數(shù)據(jù)的時(shí)候才會(huì)創(chuàng)建,注意這里每一行的分隔符指定的是’\177’(目前是寫死的,十進(jìn)制為127).?
??3. 插入數(shù)據(jù),在執(zhí)行之前需要首先設(shè)置一些配置項(xiàng),這些配置項(xiàng)通過hive的SET命令設(shè)置,是根據(jù)這個(gè)cube的job的配置文件(一般是在kylin的conf目錄下)設(shè)置的,最后執(zhí)行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語句,SELECT子句中選出cube星狀模型中事實(shí)表與維度表按照設(shè)置的方式j(luò)oin之后的出現(xiàn)在維度或者度量參數(shù)中的列(特殊處理derived列),然后再加上用戶設(shè)置的where條件和partition的時(shí)間條件(根據(jù)輸入build的參數(shù)).?
??需要注意的是這里無論用戶設(shè)置了多少維度和度量,每次join都會(huì)使用事實(shí)表和所有的維度表進(jìn)行join,這可能造成不必要的性能損失(多一個(gè)join會(huì)影響hive性能,畢竟要多讀一些文件)。這一步執(zhí)行完成之后location指定的目錄下就有了原始數(shù)據(jù)的文件,為接下來的任務(wù)提供了輸入。

1.2 創(chuàng)建事實(shí)表distinct column文件(Extract Fact Table Distinct Columns)

??在這一步是根據(jù)上一步生成的hive表計(jì)算出還表中的每一個(gè)出現(xiàn)在事實(shí)表中的度量的distinct值,并寫入到文件中,它是啟動(dòng)一個(gè)MR任務(wù)完成的,MR任務(wù)的輸入是HCatInputFormat,它關(guān)聯(lián)的表就是上一步創(chuàng)建的臨時(shí)表,這個(gè)MR任務(wù)的map階段首先在setup函數(shù)中得到所有度量中出現(xiàn)在事實(shí)表的度量在臨時(shí)表的index,根據(jù)每一個(gè)index得到該列在臨時(shí)表中在每一行的值value,然后將<index, value>作為mapper的輸出,該任務(wù)還啟動(dòng)了一個(gè)combiner,它所做的只是對同一個(gè)key的值進(jìn)行去重(同一個(gè)mapper的結(jié)果),reducer所做的事情也是進(jìn)行去重(所有mapper的結(jié)果),然后將每一個(gè)index對應(yīng)的值一行行的寫入到以列名命名的文件中。如果某一個(gè)維度列的distinct值比較大,那么可能導(dǎo)致MR任務(wù)執(zhí)行過程中的OOM。?
??對于這一步我有一個(gè)疑問就是既然所有的原始數(shù)據(jù)都已經(jīng)通過第一步存入到臨時(shí)hive表中了,我覺得接下來就不用再區(qū)分維度表和事實(shí)表了,所有的任務(wù)都基于這個(gè)臨時(shí)表,那么這一步就可以根據(jù)臨時(shí)表計(jì)算出所有的維度列的distinct column值,但是這里僅僅針對出現(xiàn)在事實(shí)表上的維度,不知道這樣做的原因是什么?難道是因?yàn)樵谙乱徊綍?huì)單獨(dú)計(jì)算維度表的dictionary以及snapshot?

1.3 創(chuàng)建維度詞典(Build Dimension Dictionary)

??這一步是根據(jù)上一步生成的distinct column文件和維度表計(jì)算出所有維度的詞典信息,詞典是為了節(jié)約存儲(chǔ)而設(shè)計(jì)的,用于將一個(gè)成員值編碼成一個(gè)整數(shù)類型并且可以通過整數(shù)值獲取到原始成員值,每一個(gè)cuboid的成員是一個(gè)key-value形式存儲(chǔ)在hbase中,key是維度成員的組合,但是一般情況下維度是一些字符串之類的值(例如商品名),所以可以通過將每一個(gè)維度值轉(zhuǎn)換成唯一整數(shù)而減少內(nèi)存占用,在從hbase查找出對應(yīng)的key之后再根據(jù)詞典獲取真正的成員值。?
?? 這一步是在kylin進(jìn)程內(nèi)的一個(gè)線程中執(zhí)行的,它會(huì)創(chuàng)建所有維度的dictionary,如果是事實(shí)表上的維度則可以從上一步生成的文件中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的信息(HiveTable),根據(jù)不同的源(文件或者h(yuǎn)ive表)獲取所有的列去重之后的成員列表,然后根據(jù)這個(gè)列表生成dictionary,kylin中針對不同類型的列使用不同的實(shí)現(xiàn)方式,對于time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這里目前還存在著一定的問題,因?yàn)檫@種編碼方式會(huì)首先將時(shí)間轉(zhuǎn)換成‘yyyy-MM-dd’的格式,會(huì)導(dǎo)致timestamp之類的精確時(shí)間失去天以后的精度。針對數(shù)值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典樹)。這些dictionary會(huì)作為cube的元數(shù)據(jù)存儲(chǔ)的kylin元數(shù)據(jù)庫里面,執(zhí)行query的時(shí)候進(jìn)行轉(zhuǎn)換。?
?? 之后還需要計(jì)算維度表的snapshotTable,每一個(gè)snapshot是和一個(gè)hive維度表對應(yīng)的,生成的過程是:首先從原始的hive維度表中順序得讀取每一行每一列的值,然后使用TrieDictionary方式對這些所有的值進(jìn)行編碼,這樣每一行每一列的之都能夠得到一個(gè)編碼之后的id(相同的值id也相同),然后再次讀取原始表中每一行的值,將每一列的值使用編碼之后的id進(jìn)行替換,得到了一個(gè)只有id的新表,這樣同時(shí)保存這個(gè)新表和dictionary對象(id和值得映射關(guān)系)就能夠保存整個(gè)維度表了,同樣,kylin也會(huì)將這個(gè)數(shù)據(jù)存儲(chǔ)元數(shù)據(jù)庫中。?
?? 針對這一步需要注意的問題:首先,這一步的兩個(gè)步驟都是在kylin進(jìn)程的一個(gè)線程中執(zhí)行的,第一步會(huì)加載某一個(gè)維度的所有distinct成員到內(nèi)存,如果某一個(gè)維度的cardinality比較大 ,可能會(huì)導(dǎo)致內(nèi)存出現(xiàn)OOM,然后在創(chuàng)建snapshotTable的時(shí)候會(huì)限制原始表的大小不能超過配置的一個(gè)上限值,如果超過則會(huì)執(zhí)行失敗。但是應(yīng)該強(qiáng)調(diào)的是這里加載全部的原始維度表更可能出現(xiàn)OOM。另外,比較疑惑的是:1、為什么不在上一步的MR任務(wù)中直接根據(jù)臨時(shí)表中的數(shù)據(jù)生成每一個(gè)distinct column值,而是從原始維度表中讀取?2、計(jì)算全表的dictionary是為了做什么?我目前只了解對于drived維度是必要保存主鍵和列之間的映射,但是需要保存整個(gè)維度表?!

1.4 計(jì)算生成BaseCuboid文件(Build Base Cuboid Data)

?? 何謂Base cuboid呢?假設(shè)一個(gè)cube包含了四個(gè)維度:A/B/C/D,那么這四個(gè)維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時(shí)候指定了select count(1) from xxx group by A,B,C,D;這個(gè)查詢結(jié)果的個(gè)數(shù)就是base cuboid集合的成員數(shù)。這一步也是通過一個(gè)MR任務(wù)完成的,輸入是臨時(shí)表的路徑和分隔符,map對于每一行首先進(jìn)行split,然后獲取每一個(gè)維度列的值組合作為rowKey,但是rowKey并不是簡單的這些維度成員的內(nèi)容組合,而是首先將這些內(nèi)容從dictionary中查找出對應(yīng)的id,然后組合這些id得到rowKey,這樣可以大大縮短hbase的存儲(chǔ)空間,提升查找性能。然后在查找該行中的度量列,根據(jù)cube定義中度量的函數(shù)返回對該列計(jì)算之后的值。這個(gè)MR任務(wù)還會(huì)執(zhí)行combiner過程,執(zhí)行邏輯和reducer相同,在reducer中的key是一個(gè)rowKey,value是相同的rowKey的measure組合的數(shù)組,reducer回分解出每一個(gè)measure的值,然后再根據(jù)定義該度量使用的聚合函數(shù)計(jì)算得到這個(gè)rowKey的結(jié)果,其實(shí)這已經(jīng)類似于hbase存儲(chǔ)的格式了。

1.5 計(jì)算第N層cuboid文件(Build N-Dimension Cuboid Data)

??這一個(gè)流程是由多個(gè)步驟的,它是根據(jù)維度組合的cuboid的總數(shù)決定的,上一層cuboid執(zhí)行MR任務(wù)的輸入是下一層cuboid計(jì)算的輸出,由于最底層的cuboid(base)已經(jīng)計(jì)算完成,所以這幾步不需要依賴于任何的hive信息,它的reducer和base cuboid的reducer過程基本一樣的(相同rowkey的measure執(zhí)行聚合運(yùn)算),mapper的過程只需要根據(jù)這一行輸入的key(例如A、B、C、D中某四個(gè)成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那么只需要將這些可能的組合提取出來作為新的key,value不變進(jìn)行輸出就可以了。?
舉個(gè)例子,假設(shè)一共四個(gè)維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個(gè)measure(對于這列V,計(jì)算sum(V)),這里忽略dictionary編碼。原始表如下:

ABCDV
A1B1C1D12
A1B2C1D13
A2B1C1D15
A3B1C1D16
A3B2C1D18

那么base cuboid最終的輸出如下?
(<A1、B1、C1、D1>、2)?
(<A1、B2、C1、D1>, 3)?
(<A2、B1、C1、D1>, 5)?
(<A3、B1、C1、D1>, 6)?
(<A3、B2、C1、D1>, 8)?
那么它作為下面一個(gè)cuboid的輸入,對于第一行輸入?
(<A1、B1、C1、D1>, 2),mapper執(zhí)行完成之后會(huì)輸出?
(<A1、B1、C1>, 2)、?
(<A1、B1、D1>, 2)、?
(<A1、C1、D1>, 2)、?
(<B1、C1、D1>, 2)這四項(xiàng),同樣對于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過reducer的聚合運(yùn)算,得到如下的結(jié)果:?
(<A1、B1、C1>, 2)?
(<A1、B1、D1>, 2)?
(<A1、C1、D1>, 2 + 3)?
(<B1、C1、D1>,2 + 5 +6)?
...?
這樣一次將下一層的結(jié)果作為輸入計(jì)算上一層的cuboid成員,直到最頂層的cuboid,這一個(gè)層cuboid只包含一個(gè)成員,不按照任何維度進(jìn)行g(shù)roup by。?
??上面的這些步驟用于生成cuboid,假設(shè)有N個(gè)維度(對于特殊類型的),那么就需要有N +1層cuboid,每一層cuboid可能是由多個(gè)維度的組合,但是它包含的維度個(gè)數(shù)相同。

2 準(zhǔn)備輸出

??在上面幾步中,我們已經(jīng)將每一層的cuboid計(jì)算完成,每一層的cuboid文件都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個(gè)數(shù),下面一步就是將這些cuboid文件導(dǎo)入到hbase中。

2.1 計(jì)算分組

??這一步的輸入是之前計(jì)算的全部的cuboid文件,按照cuboid文件的順序(層次的順序)一次讀取每一個(gè)key-value,再按照key-value的形式統(tǒng)計(jì)每一個(gè)key和value占用的空間大小,然后以GB為單位,mapper階段的輸出是每當(dāng)統(tǒng)計(jì)到1GB的數(shù)據(jù),將當(dāng)前的這個(gè)key和當(dāng)前數(shù)據(jù)量總和輸出,在reducer階段根據(jù)用戶創(chuàng)建cube時(shí)指定的cube大小(SMALL,MEDIUM和LARGE)和總的大小計(jì)算出實(shí)際需要?jiǎng)澐譃槎嗌俜謪^(qū),這時(shí)還需要參考最多分區(qū)數(shù)和最少分區(qū)數(shù)進(jìn)行計(jì)算,再根據(jù)實(shí)際數(shù)據(jù)量大小和分區(qū)數(shù)計(jì)算出每一個(gè)分區(qū)的邊界key,將這個(gè)key和對應(yīng)的分區(qū)編號輸出到最終文件中,為下一步創(chuàng)建htable做準(zhǔn)備。

2.2 創(chuàng)建HTable

??這一步非常簡單,根據(jù)上一步計(jì)算出的rowKey分布情況(split數(shù)組)創(chuàng)建HTable,創(chuàng)建一個(gè)HTable的時(shí)候還需要考慮一下幾個(gè)事情:1、列組的設(shè)置,2、每一個(gè)列組的壓縮方式,3、部署coprocessor,4、HTable中每一個(gè)region的大小。在這一步中,列組的設(shè)置是根據(jù)用戶創(chuàng)建cube時(shí)候設(shè)置的,在hbase中存儲(chǔ)的數(shù)據(jù)key是維度成員的組合,value是對應(yīng)聚合函數(shù)的結(jié)果,列組針對的是value的,一般情況下在創(chuàng)建cube的時(shí)候只會(huì)設(shè)置一個(gè)列組,該列包含所有的聚合函數(shù)的結(jié)果;在創(chuàng)建HTable時(shí)默認(rèn)使用LZO壓縮,如果不支持LZO則不進(jìn)行壓縮,在后面kylin的版本中支持更多的壓縮方式;kylin強(qiáng)依賴于hbase的coprocessor,所以需要在創(chuàng)建HTable為該表部署coprocessor,這個(gè)文件會(huì)首先上傳到HBase所在的HDFS上,然后在表的元信息中關(guān)聯(lián),這一步很容易出現(xiàn)錯(cuò)誤,例如coprocessor找不到了就會(huì)導(dǎo)致整個(gè)regionServer無法啟動(dòng),所以需要特別小心;region的劃分已經(jīng)在上一步確定了,所以這里不存在動(dòng)態(tài)擴(kuò)展的情況,所以kylin創(chuàng)建HTable使用的接口如下:?
public void createTable( final HTableDescriptor desc , byte [][] splitKeys)

2.3 構(gòu)建hfile文件

??創(chuàng)建完了HTable之后一般會(huì)通過插入接口將數(shù)據(jù)插入到表中,但是由于cuboid中的數(shù)據(jù)量巨大,頻繁的插入會(huì)對Hbase的性能有非常大的影響,所以kylin采取了首先將cuboid文件轉(zhuǎn)換成HTable格式的Hfile文件,然后在通過bulkLoad的方式將文件和HTable進(jìn)行關(guān)聯(lián),這樣可以大大降低Hbase的負(fù)載,這個(gè)過程通過一個(gè)MR任務(wù)完成。?
??這個(gè)任務(wù)的輸入是所有的cuboid文件,在mapper階段根據(jù)每一個(gè)cuboid成員的key-value輸出,如果cube定義時(shí)指定了多個(gè)列組,那么同一個(gè)key要按照不同列組中的值分別輸出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的數(shù)據(jù),而cube中將這兩個(gè)度量劃分到兩個(gè)列組中,這時(shí)候?qū)τ谶@一行數(shù)據(jù),mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會(huì)按照行排序輸出,如果一行中包含多個(gè)值,那么會(huì)將這些值進(jìn)行排序再輸出。輸出的格式則是根據(jù)HTable的文件格式定義的。

2.4 BulkLoad文件

??這一步將HFile文件load到HTable中,因?yàn)閘oad操作會(huì)將原始的文件刪除(相當(dāng)于remove),在操作之前首先將所有列組的Hfile的權(quán)限都設(shè)置為777,然后再啟動(dòng)LoadIncrementalHFiles任務(wù)執(zhí)行l(wèi)oad操作,它的輸入為文件的路徑和HTable名,這一步完全依賴于HBase的工具。這一步完成之后,數(shù)據(jù)已經(jīng)存儲(chǔ)到HBase中了,key的格式由cuboid編號+每一個(gè)成員在字典樹的id組成,value可能保存在多個(gè)列組里,包含在原始數(shù)據(jù)中按照這幾個(gè)成員進(jìn)行GROUP BY計(jì)算出的度量的值。

3 收尾工作

??執(zhí)行完上一步就已經(jīng)完成了從輸入到輸出的計(jì)算過程,接下來要做的就是一些kylin內(nèi)部的工作,分別是更新元數(shù)據(jù),更新cube狀態(tài),垃圾數(shù)據(jù)回收。

3.1 更新狀態(tài)

??這一步主要是更新cube的狀態(tài),其中需要更新的包括cube是否可用、以及本次構(gòu)建的數(shù)據(jù)統(tǒng)計(jì),包括構(gòu)建完成的時(shí)間,輸入的record數(shù)目,輸入數(shù)據(jù)的大小,保存到Hbase中數(shù)據(jù)的大小等,并將這些信息持久到元數(shù)據(jù)庫中。

3.2 垃圾文件回收

??這一步是否成功對正確性不會(huì)有任何影響,因?yàn)榻?jīng)過上一步之后這個(gè)segment就可以在這個(gè)cube中被查找到了,但是在整個(gè)執(zhí)行過程中產(chǎn)生了很多的垃圾文件,其中包括:1、臨時(shí)的hive表,2、因?yàn)閔ive表是一個(gè)外部表,存儲(chǔ)該表的文件也需要額外刪除,3、fact distinct 這一步將數(shù)據(jù)寫入到HDFS上為建立詞典做準(zhǔn)備,這時(shí)候也可以刪除了,4、rowKey統(tǒng)計(jì)的時(shí)候會(huì)生成一個(gè)文件,此時(shí)可以刪除。5、生成HFile時(shí)文件存儲(chǔ)的路徑和hbase真正存儲(chǔ)的路徑不同,雖然load是一個(gè)remove操作,但是上層的目錄還是存在的,也需要?jiǎng)h除。這一步kylin做的比較簡單,并沒有完全刪除所有的臨時(shí)文件,其實(shí)在整個(gè)計(jì)算過程中,真正還需要保留的數(shù)據(jù)只有多個(gè)cuboid文件(需要增量build的cube),這個(gè)因?yàn)樵诓煌瑂egment進(jìn)行merge的時(shí)候是基于cuboid文件的,而不是根據(jù)HTable的。

??在Kylin-1.x版本中,整個(gè)cube的一個(gè)build的過程大概就是這樣,這樣的一個(gè)build只不過是生成一虐segment,而當(dāng)一個(gè)cube中存在多個(gè)segment時(shí)可能需要將它們進(jìn)行merge,merge的過程和build的流程大致是相同的,不過它不需要從頭開始,只需要對字典進(jìn)行merge,然后在對cuboid文件進(jìn)行merge,最后生成一個(gè)新的HTable。?
但是在Kylin-2.x版本中,整個(gè)家溝發(fā)生了很大的變化,build的引擎也分成了多套,分別是原始的MR引擎,基于Fast Cubing的MR引擎和Spark引擎,這使得build進(jìn)行的更迅速,大大降低等待時(shí)間,后面會(huì)持續(xù)的再對新的引擎進(jìn)行分析。

Kylin Cube Build的接口說明
每一個(gè)Cube需要設(shè)置數(shù)據(jù)源、計(jì)算引擎和存儲(chǔ)引擎,工廠類負(fù)責(zé)創(chuàng)建數(shù)據(jù)源對象、計(jì)算引擎對象和存儲(chǔ)引擎對象
三者之間通過適配器進(jìn)行串聯(lián)
數(shù)據(jù)源接口(ISource)
public interface ISource extends Closeable {

? ? // 同步數(shù)據(jù)源中表的元數(shù)據(jù)信息
? ? ISourceMetadataExplorer getSourceMetadataExplorer();

? ? // 適配制定的構(gòu)建引擎接口
? ? <I> I adaptToBuildEngine(Class<I> engineInterface);

? ? // 順序讀取表
? ? IReadableTable createReadableTable(TableDesc tableDesc);

? ? // 構(gòu)建之前豐富數(shù)據(jù)源的Partition
? ? SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
存儲(chǔ)引擎接口(IStorage)

public interface IStorage {

? ? // 創(chuàng)建一個(gè)查詢指定Cube的對象
? ? public IStorageQuery createQuery(IRealization realization);

? ? public <I> I adaptToBuildEngine(Class<I> engineInterface);
}
1
2
3
4
5
6
7
8
計(jì)算引擎接口(IBatchCubingEngine)
public interface IBatchCubingEngine {

? ? public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);

? ? // 返回一個(gè)工作流計(jì)劃, 用以構(gòu)建指定的CubeSegment
? ? public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);

? ?// 返回一個(gè)工作流計(jì)劃, 用以合并指定的CubeSegment
? ? public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);

? ?// 返回一個(gè)工作流計(jì)劃, 用以優(yōu)化指定的CubeSegment
? ? public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);

? ? public Class<?> getSourceInterface();

? ? public Class<?> getStorageInterface();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
離線Cube Build 調(diào)用鏈
Rest API請求/{cubeName}/rebuild, 調(diào)用CubeController.rebuild()方法 -> jobService.submitJob()
Project級別的權(quán)限校驗(yàn): aclEvaluate.checkProjectOperationPermission(cube);
ISource source = SourceManager.getSource(cube)根據(jù)CubeInstance的方法getSourceType()的返回值決定ISource的對象類型

public int getSourceType() {
? ? return getModel().getRootFactTable().getTableDesc().getSourceType();
}
1
2
3
分配新的segment: newSeg = getCubeManager().appendSegment(cube, src);

EngineFactory根據(jù)Cube定義的engine type, 創(chuàng)建對應(yīng)的IBatchCubingEngine對象 -> 調(diào)用createBatchCubingJob()方法創(chuàng)建作業(yè)鏈,MRBatchCubingEngine2新建的是BatchCubingJobBuilder2

public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
? ? super(newSegment, submitter);
? ? this.inputSide = MRUtil.getBatchCubingInputSide(seg);
? ? this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
} ? ?
1
2
3
4
5
適配輸入數(shù)據(jù)源到構(gòu)建引擎

SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);

public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
? ? return getSource(table).adaptToBuildEngine(engineInterface);
}

// HiveSource返回的是HiveMRInput
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
? ? if (engineInterface == IMRInput.class) {
? ? ? ? return (I) new HiveMRInput();
? ? } else {
? ? ? ? throw new RuntimeException("Cannot adapt to " + engineInterface);
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
適配存儲(chǔ)引擎到構(gòu)建引擎

StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);

public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
? ? return storage(aware).adaptToBuildEngine(engineInterface);
}

// HBaseStorage返回的是HBaseMROutput2Transition
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
? ? if (engineInterface == IMROutput2.class) {
? ? ? ? return (I) new HBaseMROutput2Transition();
? ? } else {
? ? ? ? throw new RuntimeException("Cannot adapt to " + engineInterface);
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
適配成功后, new BatchCubingJobBuilder2(newSegment, submitter).build()該方法創(chuàng)建具體的執(zhí)行步驟, 形成工作流
將工作流添加到執(zhí)行管理器,等待調(diào)度執(zhí)行: getExecutableManager().addJob(job);
---------------------?

本文主要介紹了Apache Kylin是如何將Hive表中的數(shù)據(jù)轉(zhuǎn)化為HBase的KV結(jié)構(gòu),并簡單介紹了Kylin的SQL查詢是如何轉(zhuǎn)化為HBase的Scan操作。

Apache Kylin 是什么
Apache Kylin是一個(gè)開源的、基于Hadoop生態(tài)系統(tǒng)的OLAP引擎(OLAP查詢引擎、OLAP多維分析引擎),能夠通過SQL接口對十億、甚至百億行的超大數(shù)據(jù)集實(shí)現(xiàn)秒級的多維分析查詢。

Apache?Kylin 核心:Kylin OLAP引擎基礎(chǔ)框架,包括元數(shù)據(jù)引擎,查詢引擎,Job(Build)引擎及存儲(chǔ)引擎等,同時(shí)包括REST服務(wù)器以響應(yīng)客戶端請求。

OLAP 是什么
即聯(lián)機(jī)分析處理:以復(fù)雜的分析型查詢?yōu)橹?#xff0c;需要掃描,聚合大量數(shù)據(jù)。

Kylin如何實(shí)現(xiàn)超大數(shù)據(jù)集的秒級多維分析查詢
預(yù)計(jì)算

對于超大數(shù)據(jù)集的復(fù)雜查詢,既然現(xiàn)場計(jì)算需要花費(fèi)較長時(shí)間,那么根據(jù)空間換時(shí)間的原理,我們就可以提前將所有可能的計(jì)算結(jié)果計(jì)算并存儲(chǔ)下來,從而實(shí)現(xiàn)超大數(shù)據(jù)集的秒級多維分析查詢。

Kylin的預(yù)計(jì)算是如何實(shí)現(xiàn)的
將數(shù)據(jù)源Hive表中的數(shù)據(jù)按照指定的維度和指標(biāo) 由計(jì)算引擎MapReduce離線計(jì)算出所有可能的查詢結(jié)果(即Cube)存儲(chǔ)到HBase中。

Cube 和 Cuboid是什么
簡單地說,一個(gè)cube就是一個(gè)Hive表的數(shù)據(jù)按照指定維度與指標(biāo)計(jì)算出的所有組合結(jié)果。

其中每一種維度組合稱為cuboid,一個(gè)cuboid包含一種具體維度組合下所有指標(biāo)的值。

如下圖,整個(gè)立方體稱為1個(gè)cube,立方體中每個(gè)網(wǎng)格點(diǎn)稱為1個(gè)cuboid,圖中(A,B,C,D)和(A,D)都是cuboid,特別的,(A,B,C,D)稱為Base cuboid。cube的計(jì)算過程是逐層計(jì)算的,首先計(jì)算Base cuboid,然后計(jì)算維度數(shù)依次減少,逐層向下計(jì)算每層的cuboid。

圖1

Build引擎Cube構(gòu)建流程

BatchCubingJobBuilder2.build方法邏輯如下:

??public CubingJob build() {
????????logger.info("MR_V2 new job to BUILD segment " + seg);???????
????????final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
????????final String jobId = result.getId();
????????final String cuboidRootPath = getCuboidRootPath(jobId);
??????
????????// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
????????// 根據(jù)事實(shí)表和維表抽取需要的維度和度量,創(chuàng)建一張寬表或平表,并且進(jìn)行文件再分配(執(zhí)行Hive命令行來完成操作)
????????inputSide.addStepPhase1_CreateFlatTable(result);???????
?
????????// Phase 2: Build Dictionary
????????// 創(chuàng)建字典由三個(gè)子任務(wù)完成,由MR引擎完成,分別是抽取維度值(包含抽樣統(tǒng)計(jì))、創(chuàng)建維度字典和保存統(tǒng)計(jì)信息
????????result.addTask(createFactDistinctColumnsStep(jobId));
????????result.addTask(createBuildDictionaryStep(jobId));
????????result.addTask(createSaveStatisticsStep(jobId));
????????// add materialize lookup tables if needed
????????LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
?
????????// 創(chuàng)建HTable
????????outputSide.addStepPhase2_BuildDictionary(result);
??????
????????// Phase 3: Build Cube
????????// 構(gòu)建Cube,包含兩種Cube構(gòu)建算法,分別是逐層算法和快速算法,在執(zhí)行時(shí)會(huì)根據(jù)源數(shù)據(jù)的統(tǒng)計(jì)信息自動(dòng)選擇一種算法(各個(gè)Mapper的小Cube的行數(shù)之和 / reduce后的Cube行數(shù) > 7,重復(fù)度高就選逐層算法,重復(fù)度低就選快速算法)
????????addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
????????addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
????????// 構(gòu)建HFile文件及把HFile文件BulkLoad到HBase
????????outputSide.addStepPhase3_BuildCube(result);
???????
????????// Phase 4: Update Metadata & Cleanup
????????// 更新Cube元數(shù)據(jù),其中需要更新的包括cube是否可用、以及本次構(gòu)建的數(shù)據(jù)統(tǒng)計(jì),包括構(gòu)建完成的時(shí)間,輸入的record數(shù)目,輸入數(shù)據(jù)的大小,保存到Hbase中數(shù)據(jù)的大小等,并將這些信息持久到元數(shù)據(jù)庫中
?
????????// 以及清理臨時(shí)數(shù)據(jù),是在整個(gè)執(zhí)行過程中產(chǎn)生了很多的垃圾文件,其中包括:1、臨時(shí)的hive表,2、因?yàn)閔ive表是一個(gè)外部表,存儲(chǔ)該表的文件也需要額外刪除,3、fact distinct 這一步將數(shù)據(jù)寫入到HDFS上為建立詞典做準(zhǔn)備,這時(shí)候也可以刪除了,4、rowKey統(tǒng)計(jì)的時(shí)候會(huì)生成一個(gè)文件,此時(shí)可以刪除。
?
????????result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
????????inputSide.addStepPhase4_Cleanup(result);
????????outputSide.addStepPhase4_Cleanup(result);????????
?
????????return result;
????}
一、?根據(jù)事實(shí)表和維表抽取需要的維度和度量,創(chuàng)建一張寬表或平表,并且進(jìn)行文件再分配

1.1 生成Hive寬表或平表(Create Intermediate Flat Hive Table)(執(zhí)行Hive命令行)

這一步的操作是根據(jù)cube的定義生成原始數(shù)據(jù),這里會(huì)新創(chuàng)建一個(gè)hive外部表,然后再根據(jù)cube中定義的星狀模型,查詢出維度(對于DERIVED類型的維度使用的是外鍵列)和度量的值插入到新創(chuàng)建的表中,這個(gè)表是一個(gè)外部表,表的數(shù)據(jù)文件(存儲(chǔ)在HDFS)作為下一個(gè)子任務(wù)的輸入,它首先根據(jù)維度中的列和度量中作為參數(shù)的列得到需要出現(xiàn)在該表中的列,然后執(zhí)行三步hive操作,這三步hive操作是通過hive -e的方式執(zhí)行的shell命令。

??1. drop TABLE IF EXISTS xxx

??2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY '\177' STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據(jù)當(dāng)前的cube名和segment的uuid生成的,location是當(dāng)前job的臨時(shí)文件,只有當(dāng)insert插入數(shù)據(jù)的時(shí)候才會(huì)創(chuàng)建,注意這里每一行的分隔符指定的是'\177'(目前是寫死的,十進(jìn)制為127)。

??3. 插入數(shù)據(jù),在執(zhí)行之前需要首先設(shè)置一些配置項(xiàng),這些配置項(xiàng)通過hive的SET命令設(shè)置,是根據(jù)這個(gè)cube的job的配置文件(一般是在kylin的conf目錄下)設(shè)置的,最后執(zhí)行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語句,SELECT子句中選出cube星狀模型中事實(shí)表與維度表按照設(shè)置的方式j(luò)oin之后的出現(xiàn)在維度或者度量參數(shù)中的列(特殊處理derived列),然后再加上用戶設(shè)置的where條件和partition的時(shí)間條件(根據(jù)輸入build的參數(shù))。

??需要注意的是這里無論用戶設(shè)置了多少維度和度量,每次join都會(huì)使用事實(shí)表和所有的維度表進(jìn)行join,這可能造成不必要的性能損失(多一個(gè)join會(huì)影響hive性能,畢竟要多讀一些文件)。這一步執(zhí)行完成之后location指定的目錄下就有了原始數(shù)據(jù)的文件,為接下來的任務(wù)提供了輸入。

JoinedFlatTable.generateDropTableStatement(flatDesc);

JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);

JoinedFlatTable.generateInsertDataStatement(flatDesc);

二、?提取緯度值、創(chuàng)建維度字典和保存統(tǒng)計(jì)信息

2.1 提取事實(shí)表維度去重值(Extract Fact Table Distinct Columns)(執(zhí)行一個(gè)MapReduce任務(wù),包含抽取緯度值及統(tǒng)計(jì)各Mapper間的重復(fù)度兩種任務(wù))

????在這一步是根據(jù)上一步生成的hive表計(jì)算出還表中的每一個(gè)出現(xiàn)在事實(shí)表中的維度的distinct值,并寫入到文件中,它是啟動(dòng)一個(gè)MR任務(wù)完成的,MR任務(wù)的輸入是HCatInputFormat,它關(guān)聯(lián)的表就是上一步創(chuàng)建的臨時(shí)表,這個(gè)MR任務(wù)的map階段首先在setup函數(shù)中得到所有維度中出現(xiàn)在事實(shí)表的維度列在臨時(shí)表的index,根據(jù)每一個(gè)index得到該列在臨時(shí)表中在每一行的值value,然后將<index+value,EMPTY_TEXT>作為mapper的輸出,通過index決定由哪個(gè)Reduce處理(而Reduce啟動(dòng)的時(shí)候根據(jù)ReduceTaskID如0000,0001來初始化決定處理哪個(gè)index對應(yīng)的維度列),該任務(wù)還啟動(dòng)了一個(gè)combiner,它所做的只是對同一個(gè)key(維度值)進(jìn)行去重(同一個(gè)mapper的結(jié)果),reducer所做的事情也是進(jìn)行key(維度值)去重(所有mapper的結(jié)果),然后在Reduce中將該維度列去重后的維度值一行行的寫入到以列名命名的文件中(注意kylin實(shí)現(xiàn)的方式,聚合的key是緯度值,而不是index)。

提取事實(shí)表維度列的唯一值是通過FactDistinctColumnsJob這個(gè)MapReduce來完成,核心思想是每個(gè)Reduce處理一個(gè)維度列,然后每個(gè)維度列Reduce單獨(dú)輸出該維度列對應(yīng)的去重后的數(shù)據(jù)文件(output written to baseDir/colName/-r-00000,baseDir/colName2/-r-00001 or 直接輸出字典?output written to baseDir/colName/colName.rldict-r-00000)。另外會(huì)輸出各Mapper間重復(fù)度統(tǒng)計(jì)文件(output written to baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001)

FactDistinctColumnsJob

FactDistinctColumnsMapper

FactDistinctColumnPartitioner

FactDistinctColumnsCombiner

FactDistinctColumnsReducer

org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper
org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer
?
在FactDistinctColumnsMapper中輸出維度值或通過HHL近似算法統(tǒng)計(jì)每個(gè)Mapper中各個(gè)CuboID的去重行數(shù)
? ? public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
? ? ? ? Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
? ? ? ? for (String[] row : rowCollection) {
? ? ? ? ? ? context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
? ? ? ? ? ? for (int i = 0; i < allCols.size(); i++) {
? ? ? ? ? ? ? ? String fieldValue = row[columnIndex[i]];
? ? ? ? ? ? ? ? if (fieldValue == null)
? ? ? ? ? ? ? ? ? ? continue;
? ? ? ? ? ? ? ? final DataType type = allCols.get(i).getType();
? ? ? ? ? ? ? ? if (dictColDeduper.isDictCol(i)) {
? ? ? ? ? ? ? ? ? ? if (dictColDeduper.add(i, fieldValue)) {
? ? ? ? ? ? ? ? ? ? ? ? // 輸出維度值,KEY=COLUMN_INDEX+COLUME_VALUE,VALUE=EMPTY_TEXT
? ? ? ? ? ? ? ? ? ? ? ? writeFieldValue(context, type, i, fieldValue);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
? ? ? ? ? ? ? ? ? ? if (old == null) {
? ? ? ? ? ? ? ? ? ? ? ? old = new DimensionRangeInfo(fieldValue, fieldValue);
? ? ? ? ? ? ? ? ? ? ? ? dimensionRangeInfoMap.put(i, old);
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? old.setMax(type.getOrder().max(old.getMax(), fieldValue));
? ? ? ? ? ? ? ? ? ? ? ? old.setMin(type.getOrder().min(old.getMin(), fieldValue));
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // 抽樣統(tǒng)計(jì),KEY=CUBOID,VALUE=HLLCount
? ? ? ? ? ? if (rowCount % 100 < samplingPercentage) {
? ? ? ? ? ? ? ? putRowKeyToHLL(row);
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? ? ? if (rowCount % 100 == 0) {
? ? ? ? ? ? ? ? dictColDeduper.resetIfShortOfMem();
? ? ? ? ? ? }
? ? ? ? ? ? rowCount++;
? ? ? ? }
? ? }
? ? protected void doCleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
? ? ? ? // output each cuboid's hll to reducer, key is 0 - cuboidId
? ? ? ? for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
? ? ? ? ? ? cuboidStatCalculator.waitForCompletion();
? ? ? ? }
? ? ? ? for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
? ? ? ? ? ? Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
? ? ? ? ? ? HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
? ? ? ? ? ? HLLCounter hll;
? ? ? ? ? ? // 輸出各個(gè)CuboID的去重行數(shù)HLLCount
? ? ? ? ? ? for (int i = 0; i < cuboidIds.length; i++) {
? ? ? ? ? ? ? ? hll = cuboidsHLL[i];
? ? ? ? ? ? ? ? tmpbuf.clear();
? ? ? ? ? ? ? ? tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
? ? ? ? ? ? ? ? tmpbuf.putLong(cuboidIds[i]);
? ? ? ? ? ? ? ? outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
? ? ? ? ? ? ? ? hllBuf.clear();
? ? ? ? ? ? ? ? hll.writeRegisters(hllBuf);
? ? ? ? ? ? ? ? outputValue.set(hllBuf.array(), 0, hllBuf.position());
? ? ? ? ? ? ? ? sortableKey.init(outputKey, (byte) 0);
? ? ? ? ? ? ? ? context.write(sortableKey, outputValue);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
? ? ? ? ? ? DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
? ? ? ? ? ? DataType dataType = allCols.get(colIndex).getType();
? ? ? ? ? ? writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());
? ? ? ? ? ? writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());
? ? ? ? }
? ? }
?
在FactDistinctColumnPartitioner中根據(jù)SelfDefineSortableKey(COLUMN_INDEX)選擇分區(qū)
? ? public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
? ? ? ? Text key = skey.getText();
? ? ? ? // 統(tǒng)計(jì)任務(wù)
? ? ? ? if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
? ? ? ? ? ? Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
? ? ? ? ? ? return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
? ? ? ? } else {
? ? ? ? ? ? // 抽取緯度值任務(wù),直接根據(jù)COLUMN_INDEX指定分區(qū)
? ? ? ? ? ? return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
? ? ? ? }
? ? }
?
在FactDistinctColumnsReducer中輸出去重后的維度值或輸出通過HLL近似算法統(tǒng)計(jì)CuboID去重后的行數(shù)
? ? public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
? ? ? ? Text key = skey.getText();
? ? ? ??
? ? ? ? // 統(tǒng)計(jì)邏輯
? ? ? ? if (isStatistics) {
? ? ? ? ? ? // for hll
? ? ? ? ? ? long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
? ? ? ? ? ? for (Text value : values) {
? ? ? ? ? ? ? ? HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
? ? ? ? ? ? ? ? ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
? ? ? ? ? ? ? ? hll.readRegisters(bf);
? ? ? ? ? ? ? ? // 累計(jì)Mapper輸出的各個(gè)CuboID未去重的行數(shù)(每個(gè)Reduce處理部分CuboIDs)
? ? ? ? ? ? ? ? totalRowsBeforeMerge += hll.getCountEstimate();
? ? ? ? ? ? ? ? if (cuboidId == baseCuboidId) {
? ? ? ? ? ? ? ? ? ? baseCuboidRowCountInMappers.add(hll.getCountEstimate());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 合并CuboID
? ? ? ? ? ? ? ? if (cuboidHLLMap.get(cuboidId) != null) {
? ? ? ? ? ? ? ? ? ? cuboidHLLMap.get(cuboidId).merge(hll);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? cuboidHLLMap.put(cuboidId, hll);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
? ? ? ? ? ? logAFewRows(value);
? ? ? ? ? ? // if dimension col, compute max/min value
? ? ? ? ? ? if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
? ? ? ? ? ? ? ? if (minValue == null || col.getType().compare(minValue, value) > 0) {
? ? ? ? ? ? ? ? ? ? minValue = value;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
? ? ? ? ? ? ? ? ? ? maxValue = value;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //if dict column
? ? ? ? ? ? if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
? ? ? ? ? ? ? ? if (buildDictInReducer) {
? ? ? ? ? ? ? ? ? ? // 如果需要在Reduce階段構(gòu)建詞典,則在doCleanup后構(gòu)建完輸出詞典文件
? ? ? ? ? ? ? ? ? ? // output written to baseDir/colName/colName.rldict-r-00000 (etc)
? ? ? ? ? ? ? ? ? ? builder.addValue(value);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // 直接輸出去重后的維度值
? ? ? ? ? ? ? ? ? ? byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
? ? ? ? ? ? ? ? ? ? // output written to baseDir/colName/-r-00000 (etc)
? ? ? ? ? ? ? ? ? ? String fileName = col.getIdentity() + "/";
? ? ? ? ? ? ? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? rowCount++;
? ? }
?
? ? protected void doCleanup(Context context) throws IOException, InterruptedException {
? ? ? ? if (isStatistics) {
? ? ? ? ? ? //output the hll info;
? ? ? ? ? ? List<Long> allCuboids = Lists.newArrayList();
? ? ? ? ? ? allCuboids.addAll(cuboidHLLMap.keySet());
? ? ? ? ? ? Collections.sort(allCuboids);
? ? ? ? ? ? logMapperAndCuboidStatistics(allCuboids); // for human check
? ? ? ? ? ? 輸出通過HLL近似算法統(tǒng)計(jì)CuboID去重后的行數(shù)
? ? ? ? ? ? outputStatistics(allCuboids);
? ? ? ? } else {
? ? ? ? ? ? //dimension col
? ? ? ? ? ? if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
? ? ? ? ? ? ? ? outputDimRangeInfo();
? ? ? ? ? ? }
? ? ? ? ? ? // dic col
? ? ? ? ? ? if (buildDictInReducer) {
? ? ? ? ? ? ? ? Dictionary<String> dict = builder.build();
? ? ? ? ? ? ? ? outputDict(col, dict);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? mos.close();
? ? }
?
? ? private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
? ? ? ? // output written to baseDir/statistics/statistics-r-00000 (etc)
? ? ? ? String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
? ? ? ? ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
? ? ? ? // 獲取進(jìn)入這個(gè)Reduce各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
? ? ? ? // mapper overlap ratio at key -1
? ? ? ? long grandTotal = 0;
? ? ? ? for (HLLCounter hll : cuboidHLLMap.values()) {
? ? ? ? ? ? // 累計(jì)各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
? ? ? ? ? ? grandTotal += hll.getCountEstimate();
? ? ? ? }
? ? ? ??
? ? ? ? // 輸出進(jìn)入這個(gè)Reduce中的各Mapper間的重復(fù)度,totalRowsBeforeMerge / grandTotal
? ? ? ? double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
? ? ? ? // ?Mapper數(shù)量
? ? ? ? // mapper number at key -2
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
? ? ? ? // 抽樣百分比
? ? ? ? // sampling percentage at key 0
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
? ? ? ? // 輸出進(jìn)入這個(gè)Reduce的各個(gè)cuboId的最終統(tǒng)計(jì)結(jié)果
? ? ? ? for (long i : allCuboids) {
? ? ? ? ? ? valueBuf.clear();
? ? ? ? ? ? cuboidHLLMap.get(i).writeRegisters(valueBuf);
? ? ? ? ? ? valueBuf.flip();
? ? ? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
? ? ? ? }
? ? }
2.2 基于維度去重值構(gòu)建維度字典(Build Dimension Dictionary)(在kylin進(jìn)程內(nèi)的一個(gè)線程中去創(chuàng)建所有維度的dictionary)

??這一步是根據(jù)上一步生成的distinct column文件和維度表計(jì)算出所有維度的詞典信息,詞典是為了節(jié)約存儲(chǔ)而設(shè)計(jì)的,用于將一個(gè)成員值編碼成一個(gè)整數(shù)類型并且可以通過整數(shù)值獲取到原始成員值,每一個(gè)cuboid的成員是一個(gè)key-value形式存儲(chǔ)在hbase中,key是維度成員的組合,但是一般情況下維度是一些字符串之類的值(例如商品名),所以可以通過將每一個(gè)維度值轉(zhuǎn)換成唯一整數(shù)而減少內(nèi)存占用,在從hbase查找出對應(yīng)的key之后再根據(jù)詞典獲取真正的成員值。使用字典的好處是有很好的數(shù)據(jù)壓縮率,可降低存儲(chǔ)空間,同時(shí)也提升存儲(chǔ)讀取的速度。缺點(diǎn)是構(gòu)建字典需要較多的內(nèi)存資源,創(chuàng)建維度基數(shù)超過千萬的容易造成內(nèi)存溢出。

???這一步是在kylin進(jìn)程內(nèi)的一個(gè)線程中執(zhí)行的,它會(huì)創(chuàng)建所有維度的dictionary,如果是事實(shí)表上的維度則可以從上一步生成的文件中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的信息(HiveTable),根據(jù)不同的源(文件或者h(yuǎn)ive表)獲取所有的列去重之后的成員列表,然后根據(jù)這個(gè)列表生成dictionary,kylin中針對不同類型的列使用不同的實(shí)現(xiàn)方式,對于time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這里目前還存在著一定的問題,因?yàn)檫@種編碼方式會(huì)首先將時(shí)間轉(zhuǎn)換成‘yyyy-MM-dd’的格式,會(huì)導(dǎo)致timestamp之類的精確時(shí)間失去天以后的精度。針對數(shù)值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典樹)。這些dictionary會(huì)作為cube的元數(shù)據(jù)存儲(chǔ)的kylin元數(shù)據(jù)庫里面,執(zhí)行query的時(shí)候進(jìn)行轉(zhuǎn)換。

???針對這一步需要注意的問題:首先,這一步的兩個(gè)步驟都是在kylin進(jìn)程的一個(gè)線程中執(zhí)行的,第一步會(huì)加載某一個(gè)維度的所有distinct成員到內(nèi)存,如果某一個(gè)維度的基數(shù)比較大 ,可能會(huì)導(dǎo)致內(nèi)存出現(xiàn)OOM,然后在創(chuàng)建snapshotTable的時(shí)候會(huì)限制原始表的大小不能超過配置的一個(gè)上限值,如果超過則會(huì)執(zhí)行失敗。但是應(yīng)該強(qiáng)調(diào)的是這里加載全部的原始維度表更可能出現(xiàn)OOM。

CreateDictionaryJob

2.3 保存統(tǒng)計(jì)信息(合并保存統(tǒng)計(jì)信息及基于上一個(gè)HyperLogLog模擬去重統(tǒng)計(jì)信息選擇Cube構(gòu)建算法等)

???針對上一個(gè)MR的HyperLogLog模擬去重統(tǒng)計(jì)結(jié)果文件baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001,合并相關(guān)統(tǒng)計(jì)信息,根據(jù)最終重復(fù)度選擇Cube構(gòu)建算法

在FactDistinctColumnsReducer中輸出進(jìn)入這個(gè)Reduce的各個(gè)CuboID的統(tǒng)計(jì)信息???

private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
????????// output written to baseDir/statistics/statistics-r-00000 (etc)
????????String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
????????ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
????????// 獲取進(jìn)入這個(gè)Reduce各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
????????// mapper overlap ratio at key -1
????????long grandTotal = 0;
????????for (HLLCounter hll : cuboidHLLMap.values()) {
????????????// 累計(jì)各個(gè)CuboID去重后的最終統(tǒng)計(jì)行數(shù)
????????????grandTotal += hll.getCountEstimate();
????????}
????????// 輸出進(jìn)入這個(gè)Reduce中的各Mapper間的重復(fù)度,totalRowsBeforeMerge / grandTotal
????????double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
????????//??Mapper數(shù)量
????????// mapper number at key -2
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
????????// 抽樣百分比
????????// sampling percentage at key 0
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
????????// 輸出進(jìn)入這個(gè)Reduce的各個(gè)cuboId的最終統(tǒng)計(jì)結(jié)果
????????for (long i : allCuboids) {
????????????valueBuf.clear();
????????????cuboidHLLMap.get(i).writeRegisters(valueBuf);
????????????valueBuf.flip();
????????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
????????}
}
在SaveStatisticsStep保存統(tǒng)計(jì)信息任務(wù)階段會(huì)去讀取上一步任務(wù)產(chǎn)出的cuboID統(tǒng)計(jì)結(jié)果文件,產(chǎn)出最終統(tǒng)計(jì)信息保存到元數(shù)據(jù)引擎中并且根據(jù)各個(gè)Mapper重復(fù)度選擇Cube構(gòu)建算法。

?Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
?long totalRowsBeforeMerge = 0;
?long grantTotal = 0;
?int samplingPercentage = -1;
?int mapperNumber = -1;
?for (Path item : statisticsFiles) {
?// 讀取解析統(tǒng)計(jì)文件
CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item,
????????????????????????kylinConf.getCubeStatsHLLPrecision());????????????
????????????????// 獲取各個(gè)CuboID的計(jì)數(shù)器
????????????????cuboidHLLMap.putAll(cubeStatsResult.getCounterMap());
????????????????long pGrantTotal = 0L;
????????????????for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) {
????????????????????pGrantTotal += hll.getCountEstimate();
????????????????}????????????????
????????????????// 累計(jì)所有Mapper輸出的cuboID行數(shù)
????????????????totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio();
????????????????// 累計(jì)去重后的cuboID統(tǒng)計(jì)行數(shù)
????????????????grantTotal += pGrantTotal;
????????????double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
????????????CubingJob cubingJob = (CubingJob) getManager()
????????????????????.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
????????????// fact源數(shù)據(jù)行數(shù)
????????????long sourceRecordCount = cubingJob.findSourceRecordCount();
???????????
????????????// 保存CuboID最終統(tǒng)計(jì)信息到最終統(tǒng)計(jì)文件cuboid_statistics.seq中
????????????// cuboidHLLMap CuboID的統(tǒng)計(jì)信息
????????????// samplingPercentage 抽樣百分比
????????????// mapperNumber Mapper數(shù)
????????????// mapperOverlapRatio 各個(gè)Mapper間的重復(fù)度
????????????// sourceRecordCount fact源數(shù)據(jù)行數(shù)
????????????CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,mapperNumber, mapperOverlapRatio, sourceRecordCount);
????????????Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
????????????logger.info(newSegment + " stats saved to hdfs " + statisticsFile);
????????????FSDataInputStream is = fs.open(statisticsFile);
????????????try {
?
????????????????// put the statistics to metadata store
????????????????// 把統(tǒng)計(jì)信息存儲(chǔ)到kylin的元數(shù)據(jù)引擎中
????????????????String resPath = newSegment.getStatisticsResourcePath();
????????????????rs.putResource(resPath, is, System.currentTimeMillis());
????????????????logger.info(newSegment + " stats saved to resource " + resPath);
????????????????// 根據(jù)抽樣數(shù)據(jù)計(jì)算重復(fù)度,選擇Cube構(gòu)建算法,如mapperOverlapRatio > 7 選逐層算法,否則選快速算法
????????????????StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
????????????????StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
????????????} finally {
????????????????IOUtils.closeStream(is);
}
用戶該如何選擇算法呢?無需擔(dān)心,Kylin會(huì)自動(dòng)選擇合適的算法。Kylin在計(jì)算Cube之前對數(shù)據(jù)進(jìn)行采樣,在“fact distinct”步,利用HyperLogLog模擬去重,估算每種組合有多少不同的key,從而計(jì)算出每個(gè)Mapper輸出的數(shù)據(jù)大小,以及所有Mapper之間數(shù)據(jù)的重合度,據(jù)此來決定采用哪種算法更優(yōu)。在對上百個(gè)Cube任務(wù)的時(shí)間做統(tǒng)計(jì)分析后,Kylin選擇了7做為默認(rèn)的算法選擇閥值(參數(shù)kylin.cube.algorithm.layer-or-inmem-threshold):如果各個(gè)Mapper的小Cube的行數(shù)之和,大于reduce后的Cube行數(shù)的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用戶在使用過程中,更傾向于使用Fast Cubing,可以適當(dāng)調(diào)大此參數(shù)值,反之調(diào)小。

org.apache.kylin.engine.mr.steps.SaveStatisticsStep???????????????

?int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();
????????????????double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // 默認(rèn)7
????????????????logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
????????????????logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is "+ overlapThreshold);
????????????????// in-mem cubing is good when
????????????????// 1) the cluster has enough mapper slots to run in parallel
????????????????// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
????????????????alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//
????????????????????????? CubingJob.AlgorithmEnum.INMEM ????// 快速算法
????????????????????????: CubingJob.AlgorithmEnum.LAYER; ???// 逐層算法
三、?構(gòu)建Cube

3.1 計(jì)算BaseCuboid文件(Build Base Cuboid Data)(執(zhí)行一個(gè)MapReduce任務(wù))

???何謂Base cuboid呢?假設(shè)一個(gè)cube包含了四個(gè)維度:A/B/C/D,那么這四個(gè)維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時(shí)候指定了select count(1) from xxx group by A,B,C,D;這個(gè)查詢結(jié)果的個(gè)數(shù)就是base cuboid集合的成員數(shù)。這一步也是通過一個(gè)MR任務(wù)完成的,輸入是臨時(shí)表的路徑和分隔符,map對于每一行首先進(jìn)行split,然后獲取每一個(gè)維度列的值組合作為rowKey,但是rowKey并不是簡單的這些維度成員的內(nèi)容組合,而是首先將這些內(nèi)容從dictionary中查找出對應(yīng)的id,然后組合這些id得到rowKey,這樣可以大大縮短hbase的存儲(chǔ)空間,提升查找性能。然后在查找該行中的度量列。這個(gè)MR任務(wù)還會(huì)執(zhí)行combiner過程,執(zhí)行邏輯和reducer相同,在reducer中的key是一個(gè)rowKey,value是相同的rowKey的measure組合的數(shù)組,reducer會(huì)分解出每一個(gè)measure的值,然后再根據(jù)定義該度量使用的聚合函數(shù)計(jì)算得到這個(gè)rowKey的結(jié)果,其實(shí)這已經(jīng)類似于hbase存儲(chǔ)的格式了。

org.apache.kylin.engine.mr.steps.BaseCuboidJob

org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper

org.apache.kylin.engine.mr.steps.CuboidReducer

3.2 計(jì)算第N層cuboid文件(Build N-Dimension Cuboid Data)(執(zhí)行N個(gè)MapReduce任務(wù))

??這一個(gè)流程是由多個(gè)步驟的,它是根據(jù)維度組合的cuboid的總數(shù)決定的,上一層cuboid執(zhí)行MR任務(wù)的輸入是下一層cuboid計(jì)算的輸出,由于最底層的cuboid(base)已經(jīng)計(jì)算完成,所以這幾步不需要依賴于任何的hive信息,它的reducer和base cuboid的reducer過程基本一樣的(相同rowkey的measure執(zhí)行聚合運(yùn)算),mapper的過程只需要根據(jù)這一行輸入的key(例如A、B、C、D中某四個(gè)成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那么只需要將這些可能的組合提取出來作為新的key,value不變進(jìn)行輸出就可以了。

舉個(gè)例子,假設(shè)一共四個(gè)維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個(gè)measure(對于這列V,計(jì)算sum(V)),這里忽略dictionary編碼。原始表如下:

A

B

C

D

V

A1

B1

C1

D1

2

A1

B2

C1

D1

3

A2

B1

C1

D1

5

A3

B1

C1

D1

6

A3

B2

C1

D1

8

那么base cuboid最終的輸出如下

(<A1、B1、C1、D1>、2)

(<A1、B2、C1、D1>, 3)

(<A2、B1、C1、D1>, 5)

(<A3、B1、C1、D1>, 6)

(<A3、B2、C1、D1>, 8)

那么它作為下面一個(gè)cuboid的輸入,對于第一行輸入

(<A1、B1、C1、D1>, 2),mapper執(zhí)行完成之后會(huì)輸出

(<A1、B1、C1>, 2)、

(<A1、B1、D1>, 2)、

(<A1、C1、D1>, 2)、

(<B1、C1、D1>, 2)這四項(xiàng),

同樣對于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過reducer的聚合運(yùn)算,得到如下的結(jié)果:

(<A1、B1、C1>, 2)

(<A1、B1、D1>, 2)

(<A1、C1、D1>, 2 + 3)

(<B1、C1、D1>,2 + 5 +6)

???...

這樣一次將下一層的結(jié)果作為輸入計(jì)算上一層的cuboid成員,直到最頂層的cuboid,這一個(gè)層cuboid只包含一個(gè)成員,不按照任何維度進(jìn)行g(shù)roup by。

?????上面的這些步驟用于生成cuboid,假設(shè)有N個(gè)維度(對于特殊類型的),那么就需要有N +1層cuboid,每一層cuboid可能是由多個(gè)維度的組合,但是它包含的維度個(gè)數(shù)相同。

org.apache.kylin.engine.mr.steps.NDCuboidJob

org.apache.kylin.engine.mr.steps.NDCuboidMapper

org.apache.kylin.engine.mr.steps.CuboidReducer

3.3 創(chuàng)建HTable

??在上面幾步中,我們已經(jīng)將每一層的cuboid計(jì)算完成,每一層的cuboid文件都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個(gè)數(shù),下面一步就是將這些cuboid文件導(dǎo)入到hbase中,根據(jù)上一步計(jì)算出的rowKey分布情況(split數(shù)組)創(chuàng)建HTable,創(chuàng)建一個(gè)HTable的時(shí)候還需要考慮一下幾個(gè)事情:1、列組的設(shè)置,2、每一個(gè)列組的壓縮方式,3、部署coprocessor,4、HTable中每一個(gè)region的大小。在這一步中,列組的設(shè)置是根據(jù)用戶創(chuàng)建cube時(shí)候設(shè)置的,在hbase中存儲(chǔ)的數(shù)據(jù)key是維度成員的組合,value是對應(yīng)聚合函數(shù)的結(jié)果,列組針對的是value的,一般情況下在創(chuàng)建cube的時(shí)候只會(huì)設(shè)置一個(gè)列組,該列包含所有的聚合函數(shù)的結(jié)果;在創(chuàng)建HTable時(shí)默認(rèn)使用LZO壓縮,如果不支持LZO則不進(jìn)行壓縮,在后面kylin的版本中支持更多的壓縮方式;kylin強(qiáng)依賴于hbase的coprocessor,所以需要在創(chuàng)建HTable為該表部署coprocessor,這個(gè)文件會(huì)首先上傳到HBase所在的HDFS上,然后在表的元信息中關(guān)聯(lián),這一步很容易出現(xiàn)錯(cuò)誤,例如coprocessor找不到了就會(huì)導(dǎo)致整個(gè)regionServer無法啟動(dòng),所以需要特別小心;region的劃分已經(jīng)在上一步確定了,所以這里不存在動(dòng)態(tài)擴(kuò)展的情況,所以kylin創(chuàng)建HTable使用的接口如下:

public void createTable( final HTableDescriptor desc , byte [][] splitKeys)。

CreateHTableJob

3.4 轉(zhuǎn)換HFile文件

??創(chuàng)建完了HTable之后一般會(huì)通過插入接口將數(shù)據(jù)插入到表中,但是由于cuboid中的數(shù)據(jù)量巨大,頻繁的插入會(huì)對Hbase的性能有非常大的影響,所以kylin采取了首先將cuboid文件轉(zhuǎn)換成HTable格式的HFile文件,然后在通過bulkLoad的方式將文件和HTable進(jìn)行關(guān)聯(lián),這樣可以大大降低Hbase的負(fù)載,這個(gè)過程通過一個(gè)MR任務(wù)完成。

??這個(gè)任務(wù)的輸入是所有的cuboid文件,在mapper階段根據(jù)每一個(gè)cuboid成員的key-value輸出,如果cube定義時(shí)指定了多個(gè)列組,那么同一個(gè)key要按照不同列組中的值分別輸出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的數(shù)據(jù),而cube中將這兩個(gè)度量劃分到兩個(gè)列組中,這時(shí)候?qū)τ谶@一行數(shù)據(jù),mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會(huì)按照行排序輸出,如果一行中包含多個(gè)值,那么會(huì)將這些值進(jìn)行排序再輸出。輸出的格式則是根據(jù)HTable的文件格式定義的。

CubeHFileJob

3.5 BulkLoad文件

??這一步將HFile文件load到HTable中,因?yàn)閘oad操作會(huì)將原始的文件刪除(相當(dāng)于remove),在操作之前首先將所有列組的Hfile的權(quán)限都設(shè)置為777,然后再啟動(dòng)LoadIncrementalHFiles任務(wù)執(zhí)行l(wèi)oad操作,它的輸入為文件的路徑和HTable名,這一步完全依賴于HBase的工具。這一步完成之后,數(shù)據(jù)已經(jīng)存儲(chǔ)到HBase中了,key的格式由cuboid編號+每一個(gè)成員在字典樹的id組成,value可能保存在多個(gè)列組里,包含在原始數(shù)據(jù)中按照這幾個(gè)成員進(jìn)行GROUP BY計(jì)算出的度量的值。

BulkLoadJob

四、?收尾工作

??執(zhí)行完上一步就已經(jīng)完成了從輸入到輸出的計(jì)算過程,接下來要做的就是一些kylin內(nèi)部的工作,分別是更新Cube元數(shù)據(jù),更新cube狀態(tài),臨時(shí)數(shù)據(jù)清理。

4.1 更新Cube元數(shù)據(jù)信息

??這一步主要是更新cube的狀態(tài),其中需要更新的包括cube是否可用、以及本次構(gòu)建的數(shù)據(jù)統(tǒng)計(jì),包括構(gòu)建完成的時(shí)間,輸入的record數(shù)目,輸入數(shù)據(jù)的大小,保存到Hbase中數(shù)據(jù)的大小等,并將這些信息持久到元數(shù)據(jù)庫中。

UpdateCubeInfoAfterBuildStep

4.2 清理臨時(shí)數(shù)據(jù)

??這一步是否成功對正確性不會(huì)有任何影響,因?yàn)榻?jīng)過上一步之后這個(gè)segment就可以在這個(gè)cube中被查找到了,但是在整個(gè)執(zhí)行過程中產(chǎn)生了很多的垃圾文件,其中包括:1、臨時(shí)的hive表,2、因?yàn)閔ive表是一個(gè)外部表,存儲(chǔ)該表的文件也需要額外刪除,3、fact distinct 這一步將數(shù)據(jù)寫入到HDFS上為建立詞典做準(zhǔn)備,這時(shí)候也可以刪除了,4、rowKey統(tǒng)計(jì)的時(shí)候會(huì)生成一個(gè)文件,此時(shí)可以刪除。5、生成HFile時(shí)文件存儲(chǔ)的路徑和hbase真正存儲(chǔ)的路徑不同,雖然load是一個(gè)remove操作,但是上層的目錄還是存在的,也需要?jiǎng)h除。這一步kylin做的比較簡單,并沒有完全刪除所有的臨時(shí)文件,其實(shí)在整個(gè)計(jì)算過程中,真正還需要保留的數(shù)據(jù)只有多個(gè)cuboid文件(需要增量build的cube),這個(gè)因?yàn)樵诓煌瑂egment進(jìn)行merge的時(shí)候是基于cuboid文件的,而不是根據(jù)HTable的。

GarbageCollectionStep

Cuboid 的維度和指標(biāo)如何轉(zhuǎn)換為HBase的KV結(jié)構(gòu)

簡單的說Cuboid的維度會(huì)映射為HBase的Rowkey,Cuboid的指標(biāo)會(huì)映射為HBase的Value。如下圖所示:?圖2

如上圖原始表所示:Hive表有兩個(gè)維度列year和city,有一個(gè)指標(biāo)列price。

如上圖預(yù)聚合表所示:我們具體要計(jì)算的是year和city這兩個(gè)維度所有維度組合(即4個(gè)cuboid)下的sum(priece)指標(biāo),這個(gè)指標(biāo)的具體計(jì)算過程就是由MapReduce完成的。

如上圖字典編碼所示:為了節(jié)省存儲(chǔ)資源,Kylin對維度值進(jìn)行了字典編碼。圖中將beijing和shanghai依次編碼為0和1。

如上圖HBase KV存儲(chǔ)所示:在計(jì)算cuboid過程中,會(huì)將Hive表的數(shù)據(jù)轉(zhuǎn)化為HBase的KV形式。Rowkey的具體格式是cuboid id + 具體的維度值(最新的Rowkey中為了并發(fā)查詢還加入了ShardKey),以預(yù)聚合表內(nèi)容的第2行為例,其維度組合是(year,city),所以cuboid id就是00000011,cuboid是8位,具體維度值是1994和shanghai,所以編碼后的維度值對應(yīng)上圖的字典編碼也是11,所以HBase的Rowkey就是0000001111,對應(yīng)的HBase Value就是sum(priece)的具體值。

所有的cuboid計(jì)算完成后,會(huì)將cuboid轉(zhuǎn)化為HBase的KeyValue格式生成HBase的HFile,最后將HFile load進(jìn)cube對應(yīng)的HBase表中。

Cube 構(gòu)建過程重要源碼分析
1 從Hive表生成Base Cuboid

在實(shí)際的cube構(gòu)建過程中,會(huì)首先根據(jù)cube的Hive事實(shí)表和維表生成一張大寬表,然后計(jì)算大寬表列的基數(shù),建立維度字典,估算cuboid的大小,建立cube對應(yīng)的HBase表,再計(jì)算base cuboid。

計(jì)算base cuboid就是一個(gè)MapReduce作業(yè),其輸入是上面提到的Hive大寬表,輸出的是key是各種維度組合,value是Hive大寬表中指標(biāo)的值。

org.apache.kylin.engine.mr.steps.BaseCuboidJob

org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper

org.apache.kylin.engine.mr.steps.CuboidReducer

map階段生成key-value的代碼如下:???

public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
????????Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value);
????????for (String[] row: rowCollection) {
????????????try {
????????????????outputKV(row, context);
????????????} catch (Exception ex) {
????????????????handleErrorRecord(row, ex);
????????????}
????????}
?
????}
2 從Base Cuboid 逐層計(jì)算 Cuboid(Cube構(gòu)建算法-逐層算法)

從base cuboid 逐層計(jì)算每層的cuboid,也是MapReduce作業(yè),map階段每層維度數(shù)依次減少。

org.apache.kylin.engine.mr.steps.NDCuboidJob
org.apache.kylin.engine.mr.steps.NDCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
????????public void doMap(Text key, Text value, Context context) throws Exception {
????????????long cuboidId = rowKeySplitter.split(key.getBytes());
????????????Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
????????????/**
?????????????* Build N-Dimension Cuboid
??????????????## 構(gòu)建N維cuboid
??????????????這些步驟是“逐層”構(gòu)建cube的過程,每一步以前一步的輸出作為輸入,然后去掉一個(gè)維度以聚合得到一個(gè)子cuboid。舉個(gè)例子,cuboid ABCD去掉A得到BCD,去掉B得到ACD。
??????????????有些cuboid可以從一個(gè)以上的父cuboid聚合得到,這種情況下,Kylin會(huì)選擇最小的一個(gè)父cuboid。舉例,AB可以從ABC(id:1110)和ABD(id:1101)生成,則ABD會(huì)被選中,因?yàn)樗谋華BC要小。
??????????????在這基礎(chǔ)上,如果D的基數(shù)較小,聚合運(yùn)算的成本就會(huì)比較低。所以,當(dāng)設(shè)計(jì)rowkey序列的時(shí)候,請記得將基數(shù)較小的維度放在末尾。這樣不僅有利于cube構(gòu)建,而且有助于cube查詢,因?yàn)轭A(yù)聚合也遵循相同的規(guī)則。
??????????????通常來說,從N維到(N/2)維的構(gòu)建比較慢,因?yàn)檫@是cuboid數(shù)量爆炸性增長的階段:N維有1個(gè)cuboid,(N-1)維有N個(gè)cuboid,(N-2)維有(N-2)*(N-1)個(gè)cuboid,以此類推。經(jīng)過(N/2)維構(gòu)建的步驟,整個(gè)構(gòu)建任務(wù)會(huì)逐漸變快。
?????????????*/
????????????Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
????????????// if still empty or null
????????????if (myChildren == null || myChildren.size() == 0) {
????????????????context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
????????????????if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????????logger.info("Skipping record with ordinal: " + skipCounter);
????????????????}
????????????????return;
????????????}???????????
????????????context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
????????????Pair<Integer, ByteArray> result;
????????????for (Long child : myChildren) {
????????????????Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
????????????????result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
????????????????outputKey.set(result.getSecond().array(), 0, result.getFirst());
????????????????context.write(outputKey, value);
????????????}?????????
????????}
從base cuboid 逐層計(jì)算每層的cuboid,也是MapReduce作業(yè),map階段每層維度數(shù)依次減少,reduce階段對指標(biāo)進(jìn)行聚合。

org.apache.kylin.engine.mr.steps.CuboidReducer
????public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
????????aggs.reset();??//MeasureAggregators 根據(jù)每種指標(biāo)的不同類型對指標(biāo)進(jìn)行聚合
????????for (Text value : values) {
????????????codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
????????????if (cuboidLevel > 0) { // Base Cuboid 的 cuboidLevel 是0
????????????????aggs.aggregate(input, needAggr); //指標(biāo)進(jìn)行進(jìn)一步聚合
????????????} else {
????????????????aggs.aggregate(input);
????????????}
????????}
????????aggs.collectStates(result);
????????ByteBuffer valueBuf = codec.encode(result);
????????outputValue.set(valueBuf.array(), 0, valueBuf.position());
????????context.write(key, outputValue);
}?
3 讀取Hive寬表直接在Mapper端預(yù)聚合構(gòu)建完整Cube(Cube構(gòu)建算法-快速算法)

快速算法的核心思想是清晰簡單的,就是最大化利用Mapper端的CPU和內(nèi)存,對分配的數(shù)據(jù)塊,將需要的組合全都做計(jì)算后再輸出給Reducer;由Reducer再做一次合并(merge),從而計(jì)算出完整數(shù)據(jù)的所有組合。如此,經(jīng)過一輪Map-Reduce就完成了以前需要N輪的Cube計(jì)算。本質(zhì)就是在Mapper端基于內(nèi)存提前做預(yù)聚合。

org.apache.kylin.engine.mr.steps.InMemCuboidJob
org.apache.kylin.engine.mr.steps.InMemCuboidMapper
org.apache.kylin.engine.mr.steps.InMemCuboidReducer
map階段生成key-value的代碼如下:
????public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
????????// put each row to the queue
????????T row = getRecordFromKeyValue(key, value);
????????if (offer(context, row, 1, TimeUnit.MINUTES, 60)) {
????????????counter++;
????????????countOfLastSplit++;
????????????if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????logger.info("Handled " + counter + " records, internal queue size = " + queue.size());
????????????}
????????} else {
????????????throw new IOException("Failed to offer row to internal queue due to queue full!");
????????}
????????if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
????????????if (offer(context, inputConverterUnit.getCutRow(), 1, TimeUnit.MINUTES, 60)) {
????????????????countOfLastSplit = 0;
????????????} else {
????????????????throw new IOException("Failed to offer row to internal queue due to queue full!");
????????????}
????????????nSplit++;
????????}
}
?
reduce階段整體合并的代碼如下:
????public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
????????aggs.reset();
????????for (ByteArrayWritable value : values) {
????????????if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
????????????}
????????????codec.decode(value.asBuffer(), input);
????????????aggs.aggregate(input);
????????}
????????aggs.collectStates(result);
????????// output key
????????outputKey.set(key.array(), key.offset(), key.length());
????????// output value
????????ByteBuffer valueBuf = codec.encode(result);
????????outputValue.set(valueBuf.array(), 0, valueBuf.position());
????????context.write(outputKey, outputValue);
????}
4 Cuboid 轉(zhuǎn)化為HBase的HFile。

主要就是數(shù)據(jù)格式的轉(zhuǎn)化。詳情請參考:?Hive 數(shù)據(jù) bulkload 導(dǎo)入 HBase

不同類型的指標(biāo)是如何進(jìn)行聚合的
每種不同的指標(biāo)都會(huì)有對應(yīng)的聚合算法,所有指標(biāo)聚合的基類是org.apache.kylin.measure.MeasureAggregator。其核心方法如下:???

? ? abstract public void reset();
????//不同類型的指標(biāo)算法會(huì)實(shí)現(xiàn)該方法
????abstract public void aggregate(V value);
????abstract public V getState();
?
以最簡單的long類型的sum指標(biāo)為例:???

public class LongSumAggregator extends MeasureAggregator<LongMutable> {
????????LongMutable sum = new LongMutable();
????????@Override
????????public void reset() {
????????????sum.set(0);
????????}
????????@Override
????????public void aggregate(LongMutable value) {
????????????sum.set(sum.get() + value.get());
????????}
????????@Override
????????public LongMutable getState() {
????????????return sum;
????????}
}
SQL查詢是如何轉(zhuǎn)化為HBase的Scan操作的

還是以圖2舉例,假設(shè)查詢SQL如下:???

select year, sum(price)
from table
where city = "beijing"
group by year
這個(gè)SQL涉及維度year和city,所以其對應(yīng)的cuboid是00000011,又因?yàn)閏ity的值是確定的beijing,所以在Scan HBase時(shí)就會(huì)Scan Rowkey以00000011開頭且city的值是beijing的行,取到對應(yīng)指標(biāo)sum(price)的值,返回給用戶。

總結(jié)
本文主要介紹了Apache Kylin是如何將Hive表中的數(shù)據(jù)轉(zhuǎn)化為HBase的KV結(jié)構(gòu),并簡單介紹了Kylin的SQL查詢是如何轉(zhuǎn)化為HBase的Scan操作。希望對大家有所幫助。
---------------------?

Kylin三大引擎和Cube構(gòu)建源碼解析


????最近在工作中用到了kylin,相關(guān)資料還不是很多,關(guān)于源碼的更是少之又少,于是結(jié)合《kylin權(quán)威指南》、《基于Apache Kylin構(gòu)建大數(shù)據(jù)分析平臺(tái)》、相關(guān)技術(shù)博客和自己對部分源碼的理解進(jìn)行了整理。

一、工作原理
每一個(gè)Cube都可以設(shè)定自己的數(shù)據(jù)源、計(jì)算引擎和存儲(chǔ)引擎,這些設(shè)定信息均保存在Cube的元數(shù)據(jù)中。在構(gòu)建Cube時(shí),首先由工廠類創(chuàng)建數(shù)據(jù)源、計(jì)算引擎和存儲(chǔ)引擎對象。這三個(gè)對象獨(dú)立創(chuàng)建,相互之間沒有關(guān)聯(lián)。


要把它們串聯(lián)起來,使用的是適配器模式。計(jì)算引擎好比是一塊主板,主控整個(gè)Cube的構(gòu)建過程。它以數(shù)據(jù)源為輸入,以存儲(chǔ)為Cube的輸出,因此也定義了IN和OUT兩個(gè)接口。數(shù)據(jù)源和存儲(chǔ)引擎則需要適配IN和OUT,提供相應(yīng)的接口實(shí)現(xiàn),把自己接入計(jì)算引擎,適配過程見下圖。適配完成之后,數(shù)據(jù)源和存儲(chǔ)引擎即可被計(jì)算引擎調(diào)用。三大引擎連通,就能協(xié)同完成Cube構(gòu)建。

計(jì)算引擎只提出接口需求,每個(gè)接口都可以有多種實(shí)現(xiàn),也就是能接入多種不同的數(shù)據(jù)源和存儲(chǔ)。類似的,每個(gè)數(shù)據(jù)源和存儲(chǔ)也可以實(shí)現(xiàn)多個(gè)接口,適配到多種不同的計(jì)算引擎上。三者之間是多對多的關(guān)系,可以任意組合,十分靈活。
二、三大主要接口
(一)數(shù)據(jù)源接口ISource

·adaptToBuildEngine:適配指定的構(gòu)建引擎接口。返回一個(gè)對象,實(shí)現(xiàn)指定的IN接口。該接口主要由計(jì)算引擎調(diào)用,要求數(shù)據(jù)源向計(jì)算引擎適配。如果數(shù)據(jù)源無法提供指定接口的實(shí)現(xiàn),則適配失敗,Cube構(gòu)建將無法進(jìn)行。
·createReadableTable:返回一個(gè)ReadableTable,用來順序讀取一個(gè)表。除了計(jì)算引擎之外,有時(shí)也會(huì)調(diào)用此方法順序訪問數(shù)據(jù)維表的內(nèi)容,用來創(chuàng)建維度字典或維表快照。
(二)存儲(chǔ)引擎接口IStorage

·adaptToBuildEngine:適配指定的構(gòu)建引擎接口。返回一個(gè)對象,實(shí)現(xiàn)指定的OUT接口。該接口主要由計(jì)算引擎調(diào)用,要求存儲(chǔ)引擎向計(jì)算引擎適配。如果存儲(chǔ)引擎無法提供指定接口的實(shí)現(xiàn),則適配失敗,Cube構(gòu)建無法進(jìn)行。
·createQuery:創(chuàng)建一個(gè)查詢對象IStorageQuery,用來查詢給定的IRealization。簡單來說,就是返回一個(gè)能夠查詢指定Cube的對象。IRealization是在Cube之上的一個(gè)抽象。其主要的實(shí)現(xiàn)就是Cube,此外還有被稱為Hybrid的聯(lián)合Cube。
(三)計(jì)算引擎接口IBatchCubingEngine

·createBatchCubingJob:返回一個(gè)工作流計(jì)劃,用以構(gòu)建指定的CubeSegment。這里的CubeSegment是一個(gè)剛完成初始化,但還不包含數(shù)據(jù)的CubeSegment。返回的DefaultChainedExecutable是一個(gè)工作流的描述對象。它將被保存并由工作流引擎在稍后調(diào)度執(zhí)行,從而完成Cube的構(gòu)建。
·createBatchMergeJob:返回一個(gè)工作流計(jì)劃,用以合并指定的CubeSegment。這里的CubeSegment是一個(gè)待合并的CubeSegment,它的區(qū)間橫跨了多個(gè)現(xiàn)有的CubeSegment。返回的工作流計(jì)劃一樣會(huì)在稍后被調(diào)度執(zhí)行,執(zhí)行的過程會(huì)將多個(gè)現(xiàn)有的CubeSegment合并為一個(gè),從而降低Cube的碎片化成都。
·getSourceInterface:指明該計(jì)算引擎的IN接口。
·getStorageInterface:指明該計(jì)算引擎的OUT接口。
三、三大引擎互動(dòng)過程
1.Rest API接收到構(gòu)建(合并)CubeSegment的請求。
2.EngineFactory根據(jù)Cube元數(shù)據(jù)的定義,創(chuàng)建IBatchCubingEngine對象,并調(diào)用其上的createBatchCubingJob(或者createBatchMergeJob)方法。
3.IBatchCubingEngine根據(jù)Cube元數(shù)據(jù)的定義,通過SourceFactory和StorageFactory創(chuàng)建出相應(yīng)的數(shù)據(jù)源ISource和存儲(chǔ)IStorage對象。
4.IBatchCubingEngine調(diào)用ISource上的adaptToBuildEngine方法傳入IN接口,要求數(shù)據(jù)源向自己適配。
5.IBatchCubingEngine調(diào)用IStorage上的adaptToBuildEngine方法,傳入OUT接口,要求存儲(chǔ)引擎向自己適配。
6.適配成功后,計(jì)算引擎協(xié)同數(shù)據(jù)源和存儲(chǔ)引擎計(jì)劃Cube構(gòu)建的具體步驟,將結(jié)果以工作流的形式返回。
7.執(zhí)行引擎將在稍后執(zhí)行工作流,完成Cube構(gòu)建。
四、Kylin默認(rèn)三大引擎Hive+MapReduce+HBase的介紹和代碼實(shí)現(xiàn)
(一)構(gòu)建引擎MapReduce
每一個(gè)構(gòu)建引擎必須實(shí)現(xiàn)接口IBatchCubingEngine,并在EngineFactory中注冊實(shí)現(xiàn)類。只有這樣才能在Cube元數(shù)據(jù)中引用該引擎,否則會(huì)在構(gòu)建Cube時(shí)出現(xiàn)“找不到實(shí)現(xiàn)”的錯(cuò)誤。
注冊的方法是通過kylin.properties來完成的。在其中添加一行構(gòu)建引擎的聲明。比如:


EngineFactory在啟動(dòng)時(shí)會(huì)讀取kylin.properties,默認(rèn)引擎即為標(biāo)號2的MRBatchCubingEngine2這個(gè)引擎。
1.MRBatchCubingEngine2

這是一個(gè)入口類,構(gòu)建Cube的主要邏輯都封裝在BatchCubingJobBuilder2和BatchMergeJobBuilder2中。其中的DefaultChainedExecutable,代表了一種可執(zhí)行的對象,其中包含了很多子任務(wù)。它執(zhí)行的過程就是一次串行執(zhí)行每一個(gè)子任務(wù),直到所有子任務(wù)都完成。kylin的構(gòu)建比較復(fù)雜,要執(zhí)行很多步驟,步驟之間有直接的依賴性和順序性。DefaultChainedExecutable很好地抽象了這種連續(xù)依次執(zhí)行的模型,可以用來表示Cube的構(gòu)建的工作流。
另外,重要的輸入輸出接口也在這里進(jìn)行聲明。IMRInput是IN接口,由數(shù)據(jù)源適配實(shí)現(xiàn);IMROutput2是OUT接口,由存儲(chǔ)引擎適配實(shí)現(xiàn)。
2.BatchCubingJobBuilder2
BatchCubingJobBuilder2和BatchMergeJobBuilder2大同小異,這里以BatchCubingJobBuilder2為例。


BatchCubingJobBuilder2中的成員變量IMRBatchCubingInputSide inputSide和IMRBatchCubingOutputSide2 outputSide分別來自數(shù)據(jù)源接口IMRInput和存儲(chǔ)接口IMROutput2,分別代表著輸入和輸出兩端參與創(chuàng)建工作流。
BatchCubingJobBuilder2的主體函數(shù)build()中,整個(gè)Cube構(gòu)建過程是一個(gè)子任務(wù)一次串行執(zhí)行的過程,這些子任務(wù)又被分為四個(gè)階段。
第一階段:創(chuàng)建平表。
這一階段的主要任務(wù)是預(yù)計(jì)算連接運(yùn)算符,把事實(shí)表和維表連接為一張大表,也稱為平表。這部分工作可通過調(diào)用數(shù)據(jù)源接口來完成,因?yàn)閿?shù)據(jù)源一般有現(xiàn)成的計(jì)算表連接方法,高效且方便,沒有必要在計(jì)算引擎中重復(fù)實(shí)現(xiàn)。
第二階段:創(chuàng)建字典。
創(chuàng)建字典由三個(gè)子任務(wù)完成,由MR引擎完成,分別是抽取列值、創(chuàng)建字典和保存統(tǒng)計(jì)信息。是否使用字典是構(gòu)建引擎的選擇,使用字典的好處是有很好的數(shù)據(jù)壓縮率,可降低存儲(chǔ)空間,同時(shí)也提升存儲(chǔ)讀取的速度。缺點(diǎn)是構(gòu)建字典需要較多的內(nèi)存資源,創(chuàng)建維度基數(shù)超過千萬的容易造成內(nèi)存溢出。
第三階段:構(gòu)建Cube。
其中包含兩種構(gòu)建cube的算法,分別是分層構(gòu)建和快速構(gòu)建。對于不同的數(shù)據(jù)分布來說它們各有優(yōu)劣,區(qū)別主要在于數(shù)據(jù)通過網(wǎng)絡(luò)洗牌的策略不同。兩種算法的子任務(wù)將全部被加入工作流計(jì)劃中,在執(zhí)行時(shí)會(huì)根據(jù)源數(shù)據(jù)的統(tǒng)計(jì)信息自動(dòng)選擇一種算法,未被選擇的算法的子任務(wù)將被自動(dòng)跳過。在構(gòu)建cube的最后還將調(diào)用存儲(chǔ)引擎的接口,存儲(chǔ)引擎負(fù)責(zé)將計(jì)算完的cube放入引擎。
第四階段:更新元數(shù)據(jù)和清理。
最后階段,cube已經(jīng)構(gòu)建完畢,MR引擎將首先添加子任務(wù)更新cube元數(shù)據(jù),然后分別調(diào)用數(shù)據(jù)源接口和存儲(chǔ)引擎接口對臨時(shí)數(shù)據(jù)進(jìn)行清理。
3.IMRInput
這是BatchCubingJobBuilder2對數(shù)據(jù)源的要求,所有希望接入MRBatchCubingEngine2的數(shù)據(jù)源都必須實(shí)現(xiàn)該接口。

·getTableInputFormat方法返回一個(gè)IMRTableInputFormat對象,用以幫助MR任務(wù)從數(shù)據(jù)源中讀取指定的關(guān)系表,為了適應(yīng)MR編程接口,其中又有兩個(gè)方法,configureJob在啟動(dòng)MR任務(wù)前被調(diào)用,負(fù)責(zé)配置所需的InputFormat,連接數(shù)據(jù)源中的關(guān)系表。由于不同的InputFormat所讀入的對象類型各不相同,為了使得構(gòu)建引擎能夠統(tǒng)一處理,因此又引入了parseMapperInput方法,對Mapper的每一行輸入都會(huì)調(diào)用該方法一次,該方法的輸入是Mapper的輸入,具體類型取決于InputFormat,輸出為統(tǒng)一的字符串?dāng)?shù)組,每列為一個(gè)元素。整體表示關(guān)系表中的一行。這樣Mapper救能遍歷數(shù)據(jù)源中的表了。
·getBatchCubingInputSide方法返回一個(gè)IMRBatchCubingInputSide對象,參與創(chuàng)建一個(gè)CubeSegment的構(gòu)建工作流,它內(nèi)部包含三個(gè)方法,addStepPhase1_CreateFlatTable()方法由構(gòu)建引擎調(diào)用,要求數(shù)據(jù)源在工作流中添加步驟完成平表的創(chuàng)建;getFlatTableInputFormat()方法幫助MR任務(wù)讀取之前創(chuàng)建的平表;addStepPhase4_Cleanup()是進(jìn)行清理收尾,清除已經(jīng)沒用的平表和其它臨時(shí)對象,這三個(gè)方法將由構(gòu)建引擎依次調(diào)用。
4.IMROutput2

這是BatchCubingJobBuilder2對存儲(chǔ)引擎的要求,所有希望接入BatchCubingJobBuilder2的存儲(chǔ)都必須實(shí)現(xiàn)該接口。
IMRBatchCubingOutputSide2代表存儲(chǔ)引擎配合構(gòu)建引擎創(chuàng)建工作流計(jì)劃,該接口的內(nèi)容如下:
·addStepPhase2_BuildDictionary:由構(gòu)建引擎在字典創(chuàng)建后調(diào)用。存儲(chǔ)引擎可以借此機(jī)會(huì)在工作流中添加步驟完成存儲(chǔ)端的初始化或準(zhǔn)備工作。
·addStepPhase3_BuildCube:由構(gòu)建引擎在Cube計(jì)算完畢之后調(diào)用,通知存儲(chǔ)引擎保存CubeSegment的內(nèi)容。每個(gè)構(gòu)建引擎計(jì)算Cube的方法和結(jié)果的存儲(chǔ)格式可能都會(huì)有所不同。存儲(chǔ)引擎必須依照數(shù)據(jù)接口的協(xié)議讀取CubeSegment的內(nèi)容,并加以保存。
·addStepPhase4_Cleanup:由構(gòu)建引擎在最后清理階段調(diào)用,給存儲(chǔ)引擎清理臨時(shí)垃圾和回收資源的機(jī)會(huì)。
(二)數(shù)據(jù)源Hive
Hive是kylin的默認(rèn)數(shù)據(jù)源,由于數(shù)據(jù)源的實(shí)現(xiàn)依賴構(gòu)建引擎對輸入接口的定義,因此本節(jié)的具體內(nèi)容只適用于MR引擎。
數(shù)據(jù)源HiveSource首先要實(shí)現(xiàn)ISource接口。


HiveSource實(shí)現(xiàn)了ISource接口中的方法。adaptToBuildEngine()只能適配IMRInput,返回HiveMRInput實(shí)例。另一個(gè)方法createReadableTable()返回一個(gè)ReadableTable對象,提供讀取一張hive表的能力。
HiveMRInput

HiveMRInput實(shí)現(xiàn)了IMRInput接口,實(shí)現(xiàn)了它的兩個(gè)方法。
一是HiveTableInputFormat實(shí)現(xiàn)了IMRTableInputFormat接口,它主要使用了HCatInputFormat作為MapReduce的輸入格式,用通用的方式讀取所有類型的Hive表。Mapper輸入對象為DefaultHCatRecord,統(tǒng)一轉(zhuǎn)換為String[]后交由構(gòu)建引擎處理。

二是BatchCubingInputSide實(shí)現(xiàn)了IMRBatchCubingInputSide接口。主要實(shí)現(xiàn)了在構(gòu)建的第一階段創(chuàng)建平表的步驟。首先用count(*)查詢獲取Hive平表的總行數(shù),然后用第二句HQL創(chuàng)建Hive平表,同時(shí)添加參數(shù)根據(jù)總行數(shù)分配Reducer數(shù)目。

(三)存儲(chǔ)引擎HBase
存儲(chǔ)引擎HBaseStorage實(shí)現(xiàn)了IStorage接口。

·createQuery方法,返回指定IRealization(數(shù)據(jù)索引實(shí)現(xiàn))的一個(gè)查詢對象。因?yàn)镠Base存儲(chǔ)是為Cube定制的,所以只支持Cube類型的數(shù)據(jù)索引。具體的IStorageQuery實(shí)現(xiàn)應(yīng)根據(jù)存儲(chǔ)引擎的版本而有所不同。
·adaptToBuildEngine方法,適配IMROutput2的輸出接口。
HBaseMROutput2
觀察IMRBatchCubingOutputSide2的實(shí)現(xiàn)。它在兩個(gè)時(shí)間點(diǎn)參與Cube構(gòu)建的工作流。一是在字典創(chuàng)建之后(Cube構(gòu)造之前),在addStepPhase2_BuildDictionary()中添加了“創(chuàng)建HTable”這一步,估算最終CubeSegment的大小,并以此來切分HTable Regions,創(chuàng)建HTable。
第二個(gè)插入點(diǎn)是在Cube計(jì)算完畢之后,由構(gòu)建引擎調(diào)用addStepPhase3_BuildCube()。這里要將Cube保存為HTable,實(shí)現(xiàn)分為“轉(zhuǎn)換HFile”和“批量導(dǎo)入到HTable”兩步。因?yàn)橹苯硬迦際Table比較緩慢,為了最快速地將數(shù)據(jù)導(dǎo)入到HTable,采取了Bulk Load的方法。先用一輪MapReduce將Cube數(shù)據(jù)轉(zhuǎn)換為HBase的存儲(chǔ)文件格式HFile,然后就可以直接將HFile導(dǎo)入空的HTable中,完成數(shù)據(jù)導(dǎo)入。
最后一個(gè)插入點(diǎn)是addStepPhase4_Cleanup()是空實(shí)現(xiàn),對于HBase存儲(chǔ)來說沒有需要清理的資源。

五、CubingJob的構(gòu)建過程
在Kylin構(gòu)建CubeSegment的過程中,計(jì)算引擎居于主導(dǎo)地位,通過它來協(xié)調(diào)數(shù)據(jù)源和存儲(chǔ)引擎。
在網(wǎng)頁上向Kylin服務(wù)端發(fā)送構(gòu)建新的CubeSegment的請求后,通過controller層來到service層,進(jìn)入JobService類中的submitJob方法,方法內(nèi)部再調(diào)用submitJobInternal方法,在build、merge和refresh的時(shí)候,通過EngineFactory.createBatchCubingJob(newSeg, submitter)返回一個(gè)job實(shí)例,從這里可以看出,CubingJob的構(gòu)建入口是由計(jì)算引擎提供的,即默認(rèn)的計(jì)算引擎MRBatchCubingEngine2。

Kylin所支持的所有計(jì)算引擎,都會(huì)在EngineFactory中注冊,并保存在batchEngine中,可以通過配置文件kylin.properties選擇計(jì)算引擎,目前Kylin支持的計(jì)算引擎有:

MRBatchCubingEngine2實(shí)現(xiàn)了createBatchCubingJob方法,方法內(nèi)調(diào)用了BatchCubingJobBuild2的build方法。

在new的初始化過程中,super(newSegment,submitter)就是執(zhí)行父類的構(gòu)造方法,進(jìn)行了一些屬性的初始化賦值,其中的inputSide和outputSide就上上文提到的數(shù)據(jù)源和存儲(chǔ)引擎實(shí)例,通過計(jì)算引擎的協(xié)調(diào)來進(jìn)行CubingJob的構(gòu)建。

數(shù)據(jù)源inputSide實(shí)例獲取:


以上即為數(shù)據(jù)源實(shí)例獲取過程的代碼展現(xiàn),BatchCubingJobBuilder2初始化的時(shí)候,調(diào)用MRUtil的getBatchCubingInputSide方法,它最終調(diào)用的其實(shí)還是MRBatchCubingEngine2這個(gè)計(jì)算引擎的getJoinedFlatTableDesc方法,它返回了一個(gè)IJoinedFlatTableDesc實(shí)例,這個(gè)對象就是對數(shù)據(jù)源表信息的封裝。獲得了這個(gè)flatDesc實(shí)例之后,就要來獲取inputSide實(shí)例,與獲取計(jì)算引擎代碼類似,目前kylin中支持的數(shù)據(jù)源有:


Kylin默認(rèn)的數(shù)據(jù)源是序號為0的HiveSource,所以最后調(diào)用的是HiveSource的adaptToBuildEngine,根據(jù)傳入的IMRInput.class接口,最終返回得到HiveMRInput的實(shí)例,最后再通過它的getBatchCubingInputSide的方法獲取inputSide的實(shí)例。
存儲(chǔ)引擎outputSide實(shí)例獲取:

以上即為存儲(chǔ)引擎實(shí)例獲取的代碼展現(xiàn),BatchCubingJobBuilder2初始化的時(shí)候,調(diào)用MRUtil的getBatchCubingOutputSide方法,方法內(nèi)先調(diào)用了StorageFactory類的createEngineAdapter方法,方法內(nèi)又調(diào)用實(shí)現(xiàn)了Storage接口的HBaseStorage類的adaptToBuildEngine方法,最后返回了HBaseMROutput2Transition實(shí)例,然后在通過它的getBatchCubingOutputSide方法就可以獲取到outputSide的實(shí)例。目前kylin中支持的數(shù)據(jù)源有:


kylin默認(rèn)的存儲(chǔ)引擎是HBase。
——————————————————————————————————
通過構(gòu)造函數(shù),數(shù)據(jù)源、計(jì)算引擎和數(shù)據(jù)存儲(chǔ)三個(gè)模塊已經(jīng)關(guān)聯(lián)到一起了,上文介紹到的MRBatchCubingEngine2的方法中,在new出了一個(gè)BatchCubingJobBuild2實(shí)例后,接著就調(diào)用了build方法,最后返回了一個(gè)CubingJob實(shí)例。build方法邏輯如下:

方法的內(nèi)容就是構(gòu)建一個(gè)CubeSegment的步驟,依次順序的加入到CubingJob的任務(wù)list中。
從第一行開始,調(diào)用了CubingJob的createBuildJob方法,里面又調(diào)用了initCubingJob方法。


initCubingJob方法就是獲取到cube相關(guān)的一些配置信息進(jìn)行初始化,它是根據(jù)cube的名字去查詢所在的project,如果不同的project下創(chuàng)建了相同名字的cube,那返回的就會(huì)是一個(gè)List,然后看配置文件中是否開啟了允許cube重名,如不允許則直接拋出異常,如果允許就在設(shè)置projectName時(shí)取返回List中的第一個(gè)元素,那么這里就可能導(dǎo)致projectName設(shè)置錯(cuò)誤,所以最好保證cube的name是全局唯一的。
在CubingJob初始化之后,會(huì)獲取cuboidRootPath,獲取邏輯如下:

經(jīng)過一連串的調(diào)用拼裝,最終獲取的路徑格式如下:
hdfs:///kylin/kylin_metadata/kylin-jobId/cubeName/cuboid
接下來就是三大引擎相互協(xié)作,構(gòu)建CubeSegment的過程,整個(gè)過程大致分為創(chuàng)建hive平表、創(chuàng)建字典、構(gòu)建Cube和更新元數(shù)據(jù)和清理這四個(gè)步驟。
第一步和第四步是由數(shù)據(jù)源來實(shí)現(xiàn)的,具體是在HiveMRInput類實(shí)現(xiàn)了IMRInput接口的getBatchCubingInputSide方法中,它返回了一個(gè)BatchCubingInputSide實(shí)例,在這個(gè)類中完成了具體工作;第二步是由計(jì)算引擎實(shí)現(xiàn)的,依靠JobBuilderSupport類中的方法完成;第三步是由計(jì)算引擎和存儲(chǔ)引擎共同完成的,包括構(gòu)建cube和存儲(chǔ)到HBase;第四步是由數(shù)據(jù)源和存儲(chǔ)引擎分別完成的;我們按步驟對代碼進(jìn)行分析。


首先是第一步創(chuàng)建hive平表調(diào)用了HiveMRInput類中的靜態(tài)內(nèi)部類BatchCubingInputSide中的addStepPhase1_CreateFlatTable方法。

先獲取cubeName、cubeConfig、hive命令(USE faltTableDatabase)三個(gè)變量。


接下來的方法就是抽取變量,進(jìn)行hive命令的拼接,完成以下步驟:
一是從hive表中,將所需字段從事實(shí)表和維表中提取出來,構(gòu)建一個(gè)寬表;
二是將上一步得到的寬表,按照某個(gè)字段進(jìn)行重新分配,如果沒有指定字段,則隨機(jī),目的是產(chǎn)生多個(gè)差不多大小的文件,作為后續(xù)構(gòu)建任務(wù)的輸入,防止數(shù)據(jù)傾斜。
三是將hive中的視圖物化。
——————————————————————————————————
創(chuàng)建平表命令例子:
hive -e "USE default;
DROP TABLE IF EXISTS kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d;
CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d
(
TACONFIRM_BUSINESSCODE string
,TACONFIRM_FUNDCODE string
,TACONFIRM_SHARETYPE string
,TACONFIRM_NETCODE string
,TACONFIRM_CURRENCYTYPE string
,TACONFIRM_CODEOFTARGETFUND string
,TACONFIRM_TARGETSHARETYPE string
,TACONFIRM_TARGETBRANCHCODE string
,TACONFIRM_RETURNCODE string
,TACONFIRM_DEFDIVIDENDMETHOD string
,TACONFIRM_FROZENCAUSE string
,TACONFIRM_TAINTERNALCODE string
,TACONFIRM_C_PROVICE string
,TAPROVINCE_PROVINCENAME string
,TASHARETYPE_SHARETYPENAME string
)
STORED AS SEQUENCEFILE
LOCATION 'hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d';
ALTER TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SET TBLPROPERTIES('auto.purge'='true');
INSERT OVERWRITE TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SELECT
TACONFIRM.BUSINESSCODE as TACONFIRM_BUSINESSCODE
,TACONFIRM.FUNDCODE as TACONFIRM_FUNDCODE
,TACONFIRM.SHARETYPE as TACONFIRM_SHARETYPE
,TACONFIRM.NETCODE as TACONFIRM_NETCODE
,TACONFIRM.CURRENCYTYPE as TACONFIRM_CURRENCYTYPE
,TACONFIRM.CODEOFTARGETFUND as TACONFIRM_CODEOFTARGETFUND
,TACONFIRM.TARGETSHARETYPE as TACONFIRM_TARGETSHARETYPE
,TACONFIRM.TARGETBRANCHCODE as TACONFIRM_TARGETBRANCHCODE
,TACONFIRM.RETURNCODE as TACONFIRM_RETURNCODE
,TACONFIRM.DEFDIVIDENDMETHOD as TACONFIRM_DEFDIVIDENDMETHOD
,TACONFIRM.FROZENCAUSE as TACONFIRM_FROZENCAUSE
,TACONFIRM.TAINTERNALCODE as TACONFIRM_TAINTERNALCODE
,TACONFIRM.C_PROVICE as TACONFIRM_C_PROVICE
,TAPROVINCE.PROVINCENAME as TAPROVINCE_PROVINCENAME
,TASHARETYPE.SHARETYPENAME as TASHARETYPE_SHARETYPENAME
FROM DEFAULT.TACONFIRM as TACONFIRM?
INNER JOIN DEFAULT.TAPROVINCE as TAPROVINCE
ON TACONFIRM.C_PROVICE = TAPROVINCE.C_PROVICE
INNER JOIN DEFAULT.TASHARETYPE as TASHARETYPE
ON TACONFIRM.SHARETYPE = TASHARETYPE.SHARETYPE
WHERE 1=1;
" --hiveconf hive.merge.mapredfiles=false --hiveconf hive.auto.convert.join=true --hiveconf dfs.replication=2 --hiveconf hive.exec.compress.output=true --hiveconf hive.auto.convert.join.noconditionaltask=true --hiveconf mapreduce.job.split.metainfo.maxsize=-1 --hiveconf hive.merge.mapfiles=false --hiveconf hive.auto.convert.join.noconditionaltask.size=100000000 --hiveconf hive.stats.autogather=true
——————————————————————————————————
文件再分配和視圖物化命令例子:
hive -e "USE default;

set mapreduce.job.reduces=3;

set hive.merge.mapredfiles=false;

INSERT OVERWRITE TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SELECT * FROM kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d DISTRIBUTE BY RAND();

" --hiveconf hive.merge.mapredfiles=false --hiveconf hive.auto.convert.join=true --hiveconf dfs.replication=2 --hiveconf hive.exec.compress.output=true --hiveconf hive.auto.convert.join.noconditionaltask=true --hiveconf mapreduce.job.split.metainfo.maxsize=-1 --hiveconf hive.merge.mapfiles=false --hiveconf hive.auto.convert.join.noconditionaltask.size=100000000 --hiveconf hive.stats.autogather=true
——————————————————————————————————

創(chuàng)建字典由三個(gè)子任務(wù)完成,分別是抽取列值、創(chuàng)建字典和保存統(tǒng)計(jì)信息,由MR引擎完成,所以直接在build方法中add到任務(wù)list中。是否使用字典是構(gòu)建引擎的選擇,使用字典的好處是有很好的數(shù)據(jù)壓縮率,可降低存儲(chǔ)空間,同時(shí)也提升存儲(chǔ)讀取的速度。缺點(diǎn)是構(gòu)建字典需要較多的內(nèi)存資源,創(chuàng)建維度基數(shù)超過千萬的容易造成內(nèi)存溢出。在這個(gè)過程最后,還要?jiǎng)?chuàng)建HTable,這屬于存儲(chǔ)引擎的任務(wù),所以是在HBaseMROutput2Transition實(shí)例中完成的。
——————————————————————————————————
抽取列值步驟參數(shù)例子:
?-conf /usr/local/apps/apache-kylin-2.3.1-bin/conf/kylin_job_conf.xml -cubename Taconfirm_kylin_15all -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -statisticsoutput hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns/statistics -statisticssamplingpercent 100 -jobname Kylin_Fact_Distinct_Columns_Taconfirm_kylin_15all_Step -cubingJobId 4c5d4bb4-791f-4ec3-b3d7-89780adc3f58
——————————————————————————————————
?構(gòu)建維度字典步驟參數(shù)例子 :
?-cubename Taconfirm_kylin_15all -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns -dictPath hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/dict
——————————————————————————————————
創(chuàng)建HTable步驟參數(shù)例子:
?-cubename Taconfirm_kylin_15all -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -partitions hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/rowkey_stats/part-r-00000 -cuboidMode CURRENT
——————————————————————————————————


構(gòu)建Cube屬于計(jì)算引擎的任務(wù),就是根據(jù)準(zhǔn)備好的數(shù)據(jù),依次產(chǎn)生cuboid的數(shù)據(jù),在這里調(diào)用了兩種構(gòu)建方法,分別是分層構(gòu)建和快速構(gòu)建,但最終只會(huì)選擇一種構(gòu)建方法,分層構(gòu)建首先調(diào)用createBaseCuboidStep方法,生成Base Cuboid數(shù)據(jù)文件,然后進(jìn)入for循環(huán),調(diào)用createNDimensionCuboidStep方法,根據(jù)Base Cuboid計(jì)算N層Cuboid數(shù)據(jù)。
在Cuboid的數(shù)據(jù)都產(chǎn)生好之后,還需要放到存儲(chǔ)層中,所以接下來調(diào)用outputSide實(shí)例的addStepPhase3_BuildCube方法,HBaseMROutput2Transition類中的addStepPhase3_BuildCube方法主要有兩步,一是createConvertCuboidToHfileStep方法,將計(jì)算引擎產(chǎn)生的cuboid數(shù)據(jù)轉(zhuǎn)換成HBase要求的HFile格式,二是createBulkLoadStep方法,即把HFIle數(shù)據(jù)加載到HBase中。
——————————————————————————————————
構(gòu)建Base Cuboid步驟參數(shù)例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -segmentid 392634bd-4964-428c-a905-9bbf28884452 -input FLAT_TABLE -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_base_cuboid -jobname Kylin_Base_Cuboid_Builder_kylin_sales_cube -level 0 -cubingJobId 6f3c2a9e-7283-4d87-9487-a5ebaffef811
——————————————————————————————————
構(gòu)建N層Cuboid步驟參數(shù)例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -segmentid 392634bd-4964-428c-a905-9bbf28884452 -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_1_cuboid -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_2_cuboid -jobname Kylin_ND-Cuboid_Builder_kylin_sales_cube_Step -level 2 -cubingJobId 6f3c2a9e-7283-4d87-9487-a5ebaffef811
——————————————————————————————————
轉(zhuǎn)換HFile格式步驟參數(shù)例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -partitions hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/rowkey_stats/part-r-00000_hfile -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/* -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/hfile -htablename KYLIN_O2SYZPV449 -jobname Kylin_HFile_Generator_kylin_sales_cube_Step
——————————————————————————————————
加載HFile到HBase步驟參數(shù)例子:
?-input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/hfile -htablename KYLIN_O2SYZPV449 -cubename kylin_sales_cube
——————————————————————————————————

最后一步就是一些收尾工作,包括更新Cube元數(shù)據(jù)信息,調(diào)用inputSide和outputSide實(shí)例進(jìn)行中間臨時(shí)數(shù)據(jù)的清理工作。

完成所有步驟之后,就回到了JobService的submitJob方法中,在得到CubingJob的實(shí)例后,會(huì)執(zhí)行以上代碼。這里做的是將CubingJob的信息物化到HBase的kylin_metadata表中,并沒有真正的提交執(zhí)行。
真正執(zhí)行CubingJob的地方是在DefaultScheduler,它里面有一個(gè)線程會(huì)每隔一分鐘,就去HBase的kylin_metadata表中掃一遍所有的CubingJob,然后將需要執(zhí)行的job,提交到線程池執(zhí)行。
kylin中任務(wù)的構(gòu)建和執(zhí)行是異步的。單個(gè)kylin節(jié)點(diǎn)有query、job和all三種角色,query只提供查詢服務(wù),job只提供真正的構(gòu)建服務(wù),all則兼具前兩者功能。實(shí)際操作中kylin的三種角色節(jié)點(diǎn)都可以進(jìn)行CubingJob的構(gòu)建,但只有all和job模式的節(jié)點(diǎn)可以通過DefaultScheduler進(jìn)行調(diào)度執(zhí)行
---------------------?

總目錄
Kylin系列(一)—— 入門?
Kylin系列(二)—— Cube 構(gòu)造算法

總目錄
Kylin cube 構(gòu)造算法
逐層算法(layer Cubing)
算法的優(yōu)點(diǎn)
算法的缺點(diǎn)
快速Cube算法(Fast Cubing)
舉個(gè)例子
子立方體生成樹(Cuboid spanning Tree)的遍歷次序
優(yōu)點(diǎn)
缺點(diǎn)
By-layer Spark Cubing算法
改進(jìn)
Spark中Cubing的過程
性能測試
Kylin cube 構(gòu)造算法
逐層算法(layer Cubing)
我們知道,一個(gè)N維的Cube,是有1個(gè)N維子立方體、N個(gè)(N-1)維子立方體、N*(N-1)/2個(gè)(N-2)維子立方體、……、N個(gè)1維子立方體和1個(gè)0維子立方體構(gòu)成,總共有2^N個(gè)子立方體。在逐層算法中,按照維度數(shù)逐層減少來計(jì)算,每個(gè)層級的計(jì)算(除了第一層,他是從原始數(shù)據(jù)聚合而來),是基于他上一層級的計(jì)算結(jié)果來計(jì)算的。

比如group by [A,B]的結(jié)果,可以基于group by [A,B,C]的結(jié)果,通過去掉C后聚合得來的,這樣可以減少重復(fù)計(jì)算;當(dāng)0維Cuboid計(jì)算出來的時(shí)候,整個(gè)Cube的計(jì)算也就完成了。

如上圖所示,展示了一個(gè)4維的Cube構(gòu)建過程。

此算法的Mapper和Reducer都比較簡單。Mapper以上一層Cuboid的結(jié)果(key-value對)作為輸入。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度,去掉它的值成新的key,并對value進(jìn)行操作,然后把新的key和value輸出,進(jìn)而Hadoop MapReduce對所有新的key進(jìn)行排序、洗牌(shuffle)、再送到Reducer處;Reducer的輸入會(huì)是一組具有相同key的value集合,對這些value做聚合運(yùn)算,再結(jié)合key輸出就完成了一輪計(jì)算。

舉個(gè)例子:?
假設(shè)一共四個(gè)維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個(gè)measure(對于這列V,計(jì)算sum(V)),這里忽略dictionary編碼。原始表如下:?


那么base cuboid最終的輸出如下?
(A1、B1、C1、D1、2)?
(A1、B2、C1、D1, 3)?
(A2、B1、C1、D1, 5)?
(A3、B1、C1、D1, 6)?
(A3、B2、C1、D1, 8)?
那么它作為下面一個(gè)cuboid的輸入,對于第一行輸入?
(A1、B1、C1、D1,2),mapper執(zhí)行完成之后會(huì)輸出?
(A1、B1、C1, 2)、?
(A1、B1、D1, 2)、?
(A1、C1、D1, 2)、?
(B1、C1、D1,2)這四項(xiàng),同樣對于其他的內(nèi)一行也會(huì)輸出四行,最終他們經(jīng)過reducer的聚合運(yùn)算,得到如下的結(jié)果:?
(A1、B1、C1, 2)?
(A1、B1、D1, 2)?
(A1、C1、D1, 2 + 3)?
(B1、C1、D1,2 + 5 +6)

這個(gè)例子其實(shí)在cube的構(gòu)建過程中可以看到。

一定要注意,這里的每一輪計(jì)算都是MapReducer任務(wù),且串行執(zhí)行;一個(gè)N維的Cube,至少需要N次MapReduce Job。

算法的優(yōu)點(diǎn)
此算法充分利用了MR的能力,處理了中間復(fù)雜的排序和洗牌工作,故而算法代碼清晰簡單,易于維護(hù)。
受益于Hadoop的日趨成熟,此算法對集群要求低,運(yùn)行穩(wěn)定。
算法的缺點(diǎn)
當(dāng)Cube有比較多維度的時(shí)候,所需要的MR任務(wù)也相應(yīng)增加;由于Hadoop的任務(wù)調(diào)度需要耗費(fèi)額外資源,特別是集群較龐大的時(shí)候,反復(fù)遞交任務(wù)造成的額外開銷會(huì)很可觀
由于Mapper不做預(yù)聚合,此算法會(huì)對Hadoop MR輸出較多數(shù)據(jù);雖然已經(jīng)使用了Combiner來減少從Mapper端到Reducer端的數(shù)據(jù)傳輸,所有數(shù)據(jù)依然需要通過MR來排序和組合才能被聚合,無形之中增加了集群的壓力。
對HDFS的讀寫操作較多:由于每一層計(jì)算的輸出會(huì)用作下一層計(jì)算的輸入,這些Key-value需要寫到HDFS上;當(dāng)所有計(jì)算都完成后,Kylin還需要額外一輪任務(wù)將這些文件轉(zhuǎn)成Hbase的HFile格式,以導(dǎo)入到HBase中去。
總體而言,該算法的效率較低,尤其當(dāng)Cube維度數(shù)較大的時(shí)候。
這里其實(shí)在困惑到底什么是0維,后來想明白了。舉個(gè)例子,現(xiàn)在有一個(gè)度量叫成交量。有幾個(gè)維度從大到小:業(yè)務(wù)類型、渠道、門店。3維的例子就是[業(yè)務(wù)類型、渠道、門店],二維的例子是[業(yè)務(wù)類型、渠道],一維[業(yè)務(wù)類型],0維其實(shí)就是沒有維度,也就是全部聚合,舉個(gè)例子就是

select sum(price) from table1
1
其實(shí)在我看來,逐層算法就是先算維度數(shù)最高的,一層算完后,再算維度數(shù)減少的一層,以此類推。至于為什么從層級高向?qū)蛹壍陀?jì)算,而不是反過來,在于如果是反過來,那你每次的計(jì)算量都是初始數(shù)據(jù),數(shù)據(jù)量非常大,沒必要。

快速Cube算法(Fast Cubing)
快速Cube算法,它還被稱作“逐段”(By Segment)或“逐塊”(By Split)算法。

該算法的主要思想,對Mapper所分配的數(shù)據(jù)塊,將它計(jì)算成一個(gè)完整的小Cube段(包含所有Cuboid);每個(gè)Mapper將計(jì)算完的Cube段輸出給Reducer做合并,生成大Cube,也就是最終結(jié)果。

與舊算法相比,快速算法主要有兩點(diǎn)不同:

Mapper會(huì)利用內(nèi)存做預(yù)聚合,算出所有組合;Mapper輸出的每個(gè)Key都是不同的,這樣會(huì)減少輸出到Hadoop MapReduce的數(shù)據(jù)量,Combiner也不再需要;
一輪MapReduce便會(huì)完成所有層次的計(jì)算,減少Hadoop任務(wù)的調(diào)配。
來說個(gè)比較。逐層算法的每一層的計(jì)算都有一個(gè)MapReduce任務(wù),因?yàn)槭菑母呔S到低維的MR任務(wù),任務(wù)之間傳遞的數(shù)據(jù)量是非常大的。比如上面的例子,生成4維的數(shù)據(jù),需要在mapper中對全數(shù)據(jù)進(jìn)行的整理,再傳遞給reducer聚合,如果數(shù)據(jù)量非常大,那么網(wǎng)絡(luò)IO是很大的。而快速算法,它會(huì)對某個(gè)分片數(shù)據(jù)進(jìn)行構(gòu)造完整的cube(所有cuboid)。再將mapper中的數(shù)據(jù)送入reducer進(jìn)行大聚合生成Cube。這其實(shí)是在map階段就已經(jīng)完成了聚合,IO是很小的。

舉個(gè)例子
這里不理解沒關(guān)系,看完后面的構(gòu)建過程再翻回來看例子就能懂

一個(gè)Cube有4個(gè)維度:A,B,C,D;每個(gè)Mapper都有100萬個(gè)源記錄要處理;Mapper中的列基數(shù)是Car(A),Car(B),Car(C)和Car(D)。(cardinal 基數(shù))

當(dāng)講源記錄聚集到base cuboid(1111)時(shí),使用舊的“逐層”算法,每個(gè)Mapper將向Hadoop輸出1百萬條記錄;使用快速立方算法,在預(yù)聚合之后,它預(yù)聚合之后,它只向Hadoop輸出[distinct A,B,C,D]記錄的數(shù)量,這樣肯定比源數(shù)據(jù)小;在正常情況下,他可以源記錄大小的1/10到1/100.

當(dāng)從父cuboid聚合到子cuboid時(shí),從base cuboid(1111) 到3維cuboid 0111,將會(huì)聚合維度A;我們假設(shè)維度A與其他維度獨(dú)立的,聚合后,cuboid 0111的維度base cuboid的1/Card(A);所以在這一步的輸出將減少到原來的1/Card(A);

總的來說,假設(shè)維度的平均基數(shù)是Card(N),從Mapper到Reducer的寫入記錄可以減少到原始維度的1/Card(N);Hadoop的輸出越少,I/O和計(jì)算越少,性能就越好。

這里要提一句,其實(shí)很多都是類似的,比如在hive中處理大表,?
各種的調(diào)優(yōu)都和IO、計(jì)算有關(guān)系,因?yàn)樗麄兌际腔贛R任務(wù)。

子立方體生成樹(Cuboid spanning Tree)的遍歷次序
在舊算法中,Kylin按照層級,也就是廣度優(yōu)先遍歷(Broad First Search)的次序計(jì)算出各個(gè)Cuboid;在快速Cube算法中,Mapper會(huì)按照深度優(yōu)先遍歷(Depth First Search)來計(jì)算各個(gè)Cuboid。?
深度優(yōu)先遍歷是一個(gè)遞歸方法,將父cuboid壓棧以計(jì)算子Cuboid,直到?jīng)]有子Cuboid需要計(jì)算才出棧并輸出給Hadoop;需要最多暫存N個(gè)Cuboid,N是Cube維度數(shù)。

采用DFS,是為了兼顧C(jī)PU和內(nèi)存。
從父Cuboid計(jì)算子Cuboid,避免重復(fù)計(jì)算。
只壓棧當(dāng)前計(jì)算的Cuboid的父Cuboid,減少內(nèi)存占用。?
舉個(gè)例子從3維到2維的MR任務(wù)中計(jì)算CD,BFS會(huì)壓入ABC ABD ACD BCD,mapper進(jìn)行切分,reducer進(jìn)行聚合;而在DFS中,只會(huì)壓入ABCD,BCD,內(nèi)存大大減少。


上圖是一個(gè)四維Cube的完整生成樹:

按照DFS的次序,在0維Cuboid輸出前的計(jì)算次序是ABCD-》BCD-》CD-》D-》0維,ABCD,BCD,CD和D需要被暫存;在被輸出后,D可被輸出,內(nèi)存得到釋放;在C被計(jì)算并輸出后,CD就可以被輸出,ABCD最后被輸出。

使用DFS訪問順序,Mapper的輸出已完全排序,因?yàn)镃uboid ID位于行鍵的開始位置,而內(nèi)部的Cuboid的行已排序。


0000?
0001[D0]?
0001[D1]?
....?
0010[C0]?
0010[C1]?
....?
0011[C0][D0]?
0011[C0][D1]?
....?
....?
1111[A0][B0][C0][D0]?
....?
這里的寫法可以看構(gòu)造過程。?

由于mapper的輸出已經(jīng)排序,Hadoop的排序效率會(huì)更高。

此外,mapper的預(yù)聚合發(fā)生在內(nèi)存中,這樣可以避免不必要的磁盤和網(wǎng)絡(luò)IO,并減少了hadoop的開銷。

在開發(fā)階段,我們在mapper中遇到了OOM錯(cuò)誤;這可能發(fā)生在:?
- Mapper的JVM堆大小很小?
- 使用 distinct count度量?
- 使用樹太深(維度太多)?
- 給Mapper的數(shù)據(jù)太大

我們意識到Kylin不能認(rèn)為mapper總是有足夠的內(nèi)存;Cubing算法需要自適應(yīng)各種情況;

當(dāng)主動(dòng)檢測到OOM錯(cuò)誤,會(huì)優(yōu)化內(nèi)存使用并將數(shù)據(jù)spilling到磁盤上;結(jié)果是有希望的,OOM錯(cuò)誤現(xiàn)在很少發(fā)生。

優(yōu)點(diǎn)
它比舊的方法更快;從我們的比較測試中可以減少30%到50%的build總時(shí)間:快在排序,快在IO。
他在Hadoop上產(chǎn)生較少的工作負(fù)載,并在HDFS上留下較少的中間文件。
Cubing和Spark等其他立方體引起可以輕松地重復(fù)使用該立方體代碼。
缺點(diǎn)
該算法有點(diǎn)復(fù)雜,這增加了維護(hù)工作;

雖然該算法可以自動(dòng)將數(shù)據(jù)spill到磁盤,但他仍希望Mapper有足夠的內(nèi)存來獲得最佳性能。

用戶需要更多知識來調(diào)整立方體。

By-layer Spark Cubing算法
我們知道,RDD(Resilient Distributed DataSet)是Spark中的一個(gè)基本概念。N維立方體的組合可以很好地描述為RDD,N維立方體將具有N+1個(gè)RDD。這些RDD具有parent/child關(guān)系,因?yàn)檫@些parent RDD 可用于生成child RDD。通過將父RDD緩存在內(nèi)存中,子RDD的生成可以比磁盤讀取更有效。

改進(jìn)
每一層的cuboid視為一個(gè)RDD
父RDD被盡可能cache到內(nèi)存
RDD 被導(dǎo)出為sequence file
通過將“map”替換為“flatMap”,以及把“reduce”替換為“reduceByKey”,可以復(fù)用大部分代碼
Spark中Cubing的過程
下圖DAG(有向無環(huán)圖),它詳細(xì)說明了這個(gè)過程:

在Stage 5中,Kylin使用HiveContext讀取中間Hive表,然后執(zhí)行一個(gè)一對一映射的”map”操作將原始值編碼為KV字節(jié)。完成后Kylin得到一個(gè)中間編碼的RDD。

在Stage 6中,中間RDD用一個(gè)“ReduceByKey”操作聚合以獲得RDD-1,這是base cuboid。接下來,在RDD-1做了一個(gè)flatMap(一對多map),因?yàn)閎ase cuboid有N個(gè)cuboid。以此類推,各級RDD得到計(jì)算。在完成時(shí),這些RDD將完整地保存在分布式文件系統(tǒng),但可以緩存在內(nèi)存中用于下一級計(jì)算。當(dāng)生成子cuboid時(shí),他將從緩存中刪除。

其實(shí)我們和舊的逐層算法去比較會(huì)發(fā)現(xiàn),他們之間的構(gòu)建沒有什么大的差別,只不過Spark的是在內(nèi)存中進(jìn)行的,無需從磁盤讀取和網(wǎng)絡(luò)IO。并且后面的stage的第一步是reduce。

性能測試


在所有這三種情況下,Spark都比MR快,總體而言它可以減少約一半的時(shí)間。

Kylin的構(gòu)建算法以及和spark的改進(jìn)?
http://cxy7.com/articles/2018/06/09/1528549073259.html?
https://www.cnblogs.com/zlslch/p/7404465.html
---------------------?

e Kylin是一個(gè)開源的分布式分析引擎,提供Hadoop之上的SQL查詢接口及多維分析(OLAP)能力以支持超大規(guī)模數(shù)據(jù)。它能在亞秒內(nèi)查詢巨大的Hive表。本文將詳細(xì)介紹Apache Kylin 1.5中的Fast-Cubing算法。

Fast Cubing,也稱快速數(shù)據(jù)立方算法, 是一個(gè)新的Cube算法。我們知道,Cube的思想是用空間換時(shí)間, 通過預(yù)先的計(jì)算,把索引及結(jié)果存儲(chǔ)起來,以換取查詢時(shí)候的高性能?。在Kylin v1.5以前,Kylin中的Cube只有一種算法:layered cubing,也稱逐層算法:它是逐層由底向上,把所有組合算完的過程。

Cube構(gòu)建算法介紹

1 逐層算法(Layer Cubing)

  我們知道,一個(gè)N維的Cube,是由1個(gè)N維子立方體、N個(gè)(N-1)維子立方體、N*(N-1)/2個(gè)(N-2)維子立方體、......、N個(gè)1維子立方體和1個(gè)0維子立方體構(gòu)成,總共有2^N個(gè)子立方體組成,在逐層算法中,按維度數(shù)逐層減少來計(jì)算,每個(gè)層級的計(jì)算(除了第一層,它是從原始數(shù)據(jù)聚合而來),是基于它上一層級的結(jié)果來計(jì)算的。

比如,[Group by A, B]的結(jié)果,可以基于[Group by A, B, C]的結(jié)果,通過去掉C后聚合得來的;這樣可以減少重復(fù)計(jì)算;當(dāng) 0維度Cuboid計(jì)算出來的時(shí)候,整個(gè)Cube的計(jì)算也就完成了。           

                       逐層算法

?

  如上圖所示,展示了一個(gè)4維的Cube構(gòu)建過程。

  此算法的Mapper和Reducer都比較簡單。Mapper以上一層Cuboid的結(jié)果(Key-Value對)作為輸入。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度,去掉它的值成新的Key,并對Value進(jìn)行操作,然后把新Key和Value輸出,進(jìn)而Hadoop MapReduce對所有新Key進(jìn)行排序、洗牌(shuffle)、再送到Reducer處;Reducer的輸入會(huì)是一組有相同Key的Value集合,對這些Value做聚合計(jì)算,再結(jié)合Key輸出就完成了一輪計(jì)算。

  每一輪的計(jì)算都是一個(gè)MapReduce任務(wù),且串行執(zhí)行; 一個(gè)N維的Cube,至少需要N次MapReduce Job。

Layer Cubing算法優(yōu)點(diǎn)

此算法充分利用了MapReduce的能力,處理了中間復(fù)雜的排序和洗牌工作,故而算法代碼清晰簡單,易于維護(hù);

受益于Hadoop的日趨成熟,此算法對集群要求低,運(yùn)行穩(wěn)定;在內(nèi)部維護(hù)Kylin的過程中,很少遇到在這幾步出錯(cuò)的情況;即便是在Hadoop集群比較繁忙的時(shí)候,任務(wù)也能完成。

Layer Cubing算法缺點(diǎn)

當(dāng)Cube有比較多維度的時(shí)候,所需要的MapReduce任務(wù)也相應(yīng)增加;由于Hadoop的任務(wù)調(diào)度需要耗費(fèi)額外資源,特別是集群較龐大的時(shí)候,反復(fù)遞交任務(wù)造成的額外開銷會(huì)相當(dāng)可觀;

由于Mapper不做預(yù)聚合,此算法會(huì)對Hadoop MapReduce輸出較多數(shù)據(jù); 雖然已經(jīng)使用了Combiner來減少從Mapper端到Reducer端的數(shù)據(jù)傳輸,所有數(shù)據(jù)依然需要通過Hadoop MapReduce來排序和組合才能被聚合,無形之中增加了集群的壓力;

對HDFS的讀寫操作較多:由于每一層計(jì)算的輸出會(huì)用做下一層計(jì)算的輸入,這些Key-Value需要寫到HDFS上;當(dāng)所有計(jì)算都完成后,Kylin還需要額外的一輪任務(wù)將這些文件轉(zhuǎn)成HBase的HFile格式,以導(dǎo)入到HBase中去;

總體而言,該算法的效率較低,尤其是當(dāng)Cube維度數(shù)較大的時(shí)候;時(shí)常有用戶問,是否能改進(jìn)Cube算法,縮短時(shí)間。

2 快速Cube算法(Fast Cubing)

  快速Cube算法(Fast Cubing)是麒麟團(tuán)隊(duì)對新算法的一個(gè)統(tǒng)稱,它還被稱作“逐段”(By Segment) 或“逐塊”(By Split) 算法。

  該算法的主要思想是,對Mapper所分配的數(shù)據(jù)塊,將它計(jì)算成一個(gè)完整的小Cube 段(包含所有Cuboid);每個(gè)Mapper將計(jì)算完的Cube段輸出給Reducer做合并,生成大Cube,也就是最終結(jié)果;圖2解釋了此流程。新算法的核心思想是清晰簡單的,就是最大化利用Mapper端的CPU和內(nèi)存,對分配的數(shù)據(jù)塊,將需要的組合全都做計(jì)算后再輸出給Reducer;由Reducer再做一次合并(merge),從而計(jì)算出完整數(shù)據(jù)的所有組合。如此,經(jīng)過一輪Map-Reduce就完成了以前需要N輪的Cube計(jì)算。圖2是此算法的概覽。

在Mapper內(nèi)部, 也可以有一些優(yōu)化,圖3是一個(gè)典型的四維Cube的生成樹;第一步會(huì)計(jì)算Base Cuboid(所有維度都有的組合),再基于它計(jì)算減少一個(gè)維度的組合。基于parent節(jié)點(diǎn)計(jì)算child節(jié)點(diǎn),可以重用之前的計(jì)算結(jié)果;當(dāng)計(jì)算child節(jié)點(diǎn)時(shí),需要parent節(jié)點(diǎn)的值盡可能留在內(nèi)存中; 如果child節(jié)點(diǎn)還有child,那么遞歸向下,所以它是一個(gè)深度優(yōu)先遍歷。當(dāng)有一個(gè)節(jié)點(diǎn)沒有child,或者它的所有child都已經(jīng)計(jì)算完,這時(shí)候它就可以被輸出,占用的內(nèi)存就可以釋放。

如果內(nèi)存夠的話,可以多線程并行向下聚合。如此可以最大限度地把計(jì)算發(fā)生在Mapper這一端,一方面減少shuffle的數(shù)據(jù)量,另一方面減少Reducer端的計(jì)算量。

Fast Cubing的優(yōu)點(diǎn):

總的IO量比以前大大減少。?

此算法可以脫離Map-Reduce而對數(shù)據(jù)做Cube計(jì)算,故可以很容易地在其它場景或框架下執(zhí)行,例如Streaming 和Spark。

Fast Cubing的缺點(diǎn):

代碼比以前復(fù)雜了很多: 由于要做多層的聚合,并且引入多線程機(jī)制,同時(shí)還要估算JVM可用內(nèi)存,當(dāng)內(nèi)存不足時(shí)需要將數(shù)據(jù)暫存到磁盤,所有這些都增加復(fù)雜度。

對Hadoop資源要求較高,用戶應(yīng)盡可能在Mapper上多分配內(nèi)存;如果內(nèi)存很小,該算法需要頻繁借助磁盤,性能優(yōu)勢就會(huì)較弱。在極端情況下(如數(shù)據(jù)量很大同時(shí)維度很多),任務(wù)可能會(huì)由于超時(shí)等原因失敗;

要讓Fast-Cubing算法獲得更高的效率,用戶需要了解更多一些“內(nèi)情”。

首先,在v1.5里,Kylin在對Fast-Cubing請求資源時(shí)候,默認(rèn)是為Mapper任務(wù)請求3Gb的內(nèi)存,給JVM2.7Gb。如果Hadoop節(jié)點(diǎn)可用內(nèi)存較多的話,用戶可以讓Kylin獲得更多內(nèi)存:在conf/kylin_job_conf_inmem.xml文件,由參數(shù)“mapreduce.map.memory.mb”和“mapreduce.map.java.opts”設(shè)定 。

其次,需要在并發(fā)性和Mapper端聚合之間找到一個(gè)平衡。在v1.5.2里,Kylin默認(rèn)是給每個(gè)Mapper分配32兆的數(shù)據(jù);這樣可以獲得較高的并發(fā)性。但如果Hadoop集群規(guī)模較小,或可用資源較少,過多的Mapper會(huì)造成任務(wù)排隊(duì)。這時(shí),將數(shù)據(jù)塊切得更大,如 64兆,效果會(huì)更好。數(shù)據(jù)塊是由Kylin創(chuàng)建Hive平表時(shí)生成的, 在kylin_hive_conf.xml由參數(shù)dfs.block.size決定的。從v1.5.3開始,分配策略又有改進(jìn),給每個(gè)mapper會(huì)分配一樣的行數(shù),從而避免數(shù)據(jù)塊不均勻時(shí)的木桶效應(yīng):由conf/kylin.properteis里的“kylin.job.mapreduce.mapper.input.rows”配置,默認(rèn)是100萬,用戶可以示自己集群的規(guī)模設(shè)置更小值獲得更高并發(fā),或更大值減少請求的Mapper數(shù)。

通常推薦Fast-Cubing 算法,但不是所有情況下都如此。舉例說明,如果每個(gè)Mapper之間的key交叉重合度較低,fast cubing更適合;因?yàn)镸apper端將這塊數(shù)據(jù)最終要計(jì)算的結(jié)果都達(dá)到了,Reducer只需少量的聚合。另一個(gè)極端是,每個(gè)Mapper計(jì)算出的key跟其它 Mapper算出的key深度重合,這意味著在reducer端仍需將各個(gè)Mapper的數(shù)據(jù)抓取來再次聚合計(jì)算;如果key的數(shù)量巨大,該過程IO開銷依然顯著。對于這種情況,Layered-Cubing更適合。

用戶該如何選擇算法呢?無需擔(dān)心,Kylin會(huì)自動(dòng)選擇合適的算法。Kylin在計(jì)算Cube之前對數(shù)據(jù)進(jìn)行采樣,在“fact distinct”步,利用HyperLogLog模擬去重,估算每種組合有多少不同的key,從而計(jì)算出每個(gè)Mapper輸出的數(shù)據(jù)大小,以及所有Mapper之間數(shù)據(jù)的重合度,據(jù)此來決定采用哪種算法更優(yōu)。在對上百個(gè)Cube任務(wù)的時(shí)間做統(tǒng)計(jì)分析后,Kylin選擇了7做為默認(rèn)的算法選擇閥值(參數(shù)kylin.cube.algorithm.layer-or-inmem-threshold):如果各個(gè)Mapper的小Cube的行數(shù)之和,大于reduce后的Cube行數(shù)的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用戶在使用過程中,更傾向于使用Fast Cubing,可以適當(dāng)調(diào)大此參數(shù)值,反之調(diào)小。

????????????????int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();
????????????????double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // default 7
????????????????logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
????????????????logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
????????????????// in-mem cubing is good when
????????????????// 1) the cluster has enough mapper slots to run in parallel
????????????????// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
????????????????alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//
????????????????????????? CubingJob.AlgorithmEnum.INMEM
????????????????????????: CubingJob.AlgorithmEnum.LAYER;
Kylin Cube 構(gòu)建算法結(jié)論(逐層算法和快速算法):

1、如果每個(gè)Mapper之間的key交叉重合度較低,fast cubing更適合;因?yàn)镸apper端將這塊數(shù)據(jù)最終要計(jì)算的結(jié)果都達(dá)到了,Reducer只需少量的聚合。另一個(gè)極端是,每個(gè)Mapper計(jì)算出的key跟其它 Mapper算出的key深度重合,這意味著在reducer端仍需將各個(gè)Mapper的數(shù)據(jù)抓取來再次聚合計(jì)算;如果key的數(shù)量巨大,該過程IO開銷依然顯著。對于這種情況,Layered-Cubing更適合。

2、在對上百個(gè)Cube任務(wù)的時(shí)間做統(tǒng)計(jì)分析后,Kylin選擇了7做為默認(rèn)的算法選擇閥值(參數(shù)kylin.cube.algorithm.auto.threshold):如果各個(gè)Mapper的小Cube的行數(shù)之和,大于reduce后的Cube行數(shù)的8倍(各個(gè)Mapper的小Cube的行數(shù)之和 /?reduce后的Cube行數(shù) > 7),采用Layered Cubing, 反之采用Fast Cubing(本質(zhì)就是各個(gè)Mapper之間的key重復(fù)度越小,就用Fast Cubing,重復(fù)度越大,就用Layered Cubing)
---------------------?

轉(zhuǎn)載于:https://my.oschina.net/hblt147/blog/3006400

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的kylin KV+cube方案分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

色资源网在线观看 | 久久69精品久久久久久久电影好 | 久久精品美女视频 | 在线 视频 一区二区 | 亚洲 综合 精品 | 久久久久久免费网 | 精品av在线播放 | 日本在线观看视频一区 | 日韩va亚洲va欧美va久久 | 欧美一级爽 | 97人人添人澡人人爽超碰动图 | 四虎成人在线 | 一区二区三区四区五区在线视频 | 2023天天干| 国产999精品久久久久久绿帽 | 一二三区高清 | 久久久久一区二区三区 | 丁香av在线 | 中文字幕在线看视频 | 国产色网站| 亚洲一区尤物 | 国产日韩视频在线观看 | 国产最顶级的黄色片在线免费观看 | 久久久www成人免费精品张筱雨 | 国产精品美乳一区二区免费 | 九九99 | 婷婷六月天天 | 免费看色的网站 | 中日韩男男gay无套 日韩精品一区二区三区高清免费 | 久久综合五月 | 欧美激情视频一区二区三区免费 | 久草色在线观看 | 亚洲欧洲一区二区在线观看 | av亚洲产国偷v产偷v自拍小说 | 成人久久久久 | 国产精品69av | 久久精品网址 | 五月色综合 | 草草草影院 | 久久99热这里只有精品国产 | 国产在线黄色 | 欧美视频在线观看免费网址 | 521色香蕉网站在线观看 | 88av视频| 高清视频一区二区三区 | av免费看网站 | 日韩肉感妇bbwbbwbbw | 欧美一级大片在线观看 | 国产在线色| 97视频一区 | 久久激情五月丁香伊人 | 97视频久久久 | 久久伊人精品天天 | 免费在线观看av网址 | 久操久| 精品人妖videos欧美人妖 | 国产在线免费 | 三级视频国产 | 国产精品va视频 | 欧美在线观看视频一区二区 | 欧美日本不卡高清 | 久久99精品久久久久久久久久久久 | 五月天天色 | 国产电影黄色av | 免费网站污 | 日韩av视屏在线观看 | 91高清免费看 | 午夜精品一区二区三区免费 | 中文字幕在线观看视频免费 | 久久久久日本精品一区二区三区 | 首页国产精品 | 国产视频在线看 | 久久成人欧美 | 日韩欧美在线观看一区二区 | 亚洲黄色在线播放 | 伊人久久一区 | 黄色av影院 | 国产一级二级三级在线观看 | 亚洲欧美日韩国产一区二区三区 | 中文字幕一区二区三区四区久久 | 射九九| 中文字幕在线观看视频免费 | 欧美专区日韩专区 | 国产精品高潮呻吟久久av无 | 最新婷婷色 | 美女视频黄免费网站 | 欧美日韩午夜 | 亚洲欧美日韩在线看 | 91精品国产欧美一区二区成人 | 国产伦精品一区二区三区在线 | 欧美成人精品三级在线观看播放 | 草久在线播放 | 最近中文字幕 | 精品国产区 | 中文字幕日韩国产 | 91在线精品秘密一区二区 | 婷婷九月丁香 | 午夜国产一区二区三区四区 | 日韩中文字幕第一页 | 亚洲精品 在线视频 | 午夜精品久久久久久久99水蜜桃 | 日韩美一区二区三区 | 免费观看福利视频 | 激情av网| 国产精品一区二区免费看 | 久草在线在线精品观看 | 黄色www在线观看 | 国产精品麻豆视频 | 中文字幕日韩电影 | www.91av在线 | 黄色在线观看免费网站 | 青青久草在线视频 | 国产成人精品三级 | 天天色成人网 | 97操操操| 欧美日韩性视频 | 精品少妇一区二区三区在线 | 欧美精品一区二区免费 | 亚洲少妇自拍 | a在线播放 | 亚洲免费精彩视频 | 亚洲精品国产综合99久久夜夜嗨 | av在线官网 | 久久久久欧美精品999 | 五月天激情综合网 | 久久精品视频免费观看 | 99在线观看 | 欧美日韩国产一区二区在线观看 | 久久久午夜剧场 | 国产91精品在线观看 | 免费成人av电影 | 国产精品99免费看 | 久久综合丁香 | 日本久久成人中文字幕电影 | 国产韩国日本高清视频 | 91麻豆精品国产91久久久无需广告 | 久久婷婷色 | 欧美在线视频一区二区三区 | 一区二区影视 | 五月婷激情 | 婷婷成人亚洲综合国产xv88 | 天天在线免费视频 | 亚洲伦理中文字幕 | 国产精品毛片一区二区在线 | 亚洲a成人v | 一区二区三区av在线 | 天天射日 | 国产黄色高清 | 国产精品视频在线观看 | 国产精品久久久免费看 | 欧美日韩在线观看视频 | 久久久久国产成人免费精品免费 | 欧美高清成人 | 国产精品久久久久久久久久久免费看 | 二区三区视频 | 免费久久精品视频 | 黄色亚洲大片免费在线观看 | 91黄色在线看 | 97视频总站 | 国产 日韩 欧美 自拍 | 久草色在线观看 | av综合在线观看 | 91系列在线观看 | 成人一区二区三区中文字幕 | 深爱五月激情五月 | 最近中文字幕高清字幕在线视频 | 国产福利免费在线观看 | 中文字幕一区二区三区久久 | 免费看精品久久片 | 香蕉视频国产在线观看 | 99人久久精品视频最新地址 | 中文字幕国产一区二区 | 国产精品18久久久久久首页狼 | 久久精精品视频 | 久久成人在线视频 | 91麻豆精品一区二区三区 | 91最新地址永久入口 | a级国产片 | 国产69精品久久久久99 | 色中文字幕在线观看 | 伊人五月天综合 | 久久有精品| 久久福利 | 亚洲综合网 | 97免费在线观看 | 婷婷综合网 | 久久这里只有精品1 | 日韩精品免费在线视频 | av在线免费网站 | av在线激情 | 日韩有码在线观看视频 | 啪啪肉肉污av国网站 | 久久久国产99久久国产一 | 不卡电影免费在线播放一区 | 青草视频在线看 | 国产 一区二区三区 在线 | 三级av免费看 | 天天操天天干天天摸 | 91一区在线观看 | 午夜国产成人 | a视频在线播放 | 国产精品久久久久久av | 久久草在线视频国产 | 国产视频在线观看一区 | 日韩精品久久久久 | 日本中文在线 | 国产中文字幕视频在线观看 | 中文字幕91| 久久婷婷一区二区三区 | 成年人免费观看国产 | 精品国产成人在线影院 | 国产精品久久久久久久久久久久午夜片 | 人人超碰97 | 国产糖心vlog在线观看 | 婷婷午夜天 | 色吊丝在线永久观看最新版本 | 少妇做爰k8经典 | 五月天天色 | 久精品视频| 成年人网站免费观看 | 亚洲五月六月 | 丰满少妇在线观看资源站 | 午夜精品一区二区三区四区 | 国产福利91精品一区二区三区 | 奇米影视四色8888 | 97精品欧美91久久久久久 | 伊人久久国产 | 97视频在线播放 | bbbbb女女女女女bbbbb国产 | 亚洲综合狠狠干 | 欧美不卡视频在线 | 亚洲精品国产精品国自 | 在线亚洲欧美视频 | 免费观看黄色12片一级视频 | 亚洲人成在线观看 | 国产精品综合久久 | 外国av网 | 国产视频一区在线 | 五月天视频网站 | 久久精品国产精品 | 成年人视频在线 | 亚洲成人动漫在线观看 | 黄色av一区二区三区 | 福利一区在线 | 91麻豆精品国产自产在线游戏 | 亚洲精品短视频 | 国产手机视频精品 | 精品国内自产拍在线观看视频 | 免费看黄色91 | 亚洲精品97 | 99久久夜色精品国产亚洲96 | 国产黄色特级片 | 欧美激情综合五月色丁香小说 | 日韩av看片 | 免费电影一区二区三区 | 久99热| www久久九 | 韩国av不卡 | 国产精品免费在线播放 | 最新色视频 | 久久手机免费观看 | 久久超级碰视频 | 欧美黑吊大战白妞欧美 | av资源网在线播放 | 免费又黄又爽 | 免费午夜av | 成人三级av | 日韩欧美大片免费观看 | 91在线小视频 | 久久黄色免费 | 国产精品高清av | 亚洲精品ww | 狠狠亚洲| 天天干夜夜想 | 亚州精品在线视频 | av免费网站观看 | 久久久国产精品电影 | 91精品国产高清自在线观看 | 亚洲精品在线观看的 | 五月综合婷 | 国产日韩欧美在线播放 | 国内成人av| 伊人久久一区 | 亚洲国产精品推荐 | 国内精品视频免费 | 国产精品美女久久久久久2018 | 亚洲国产成人高清精品 | 91精品高清 | 有码中文字幕在线观看 | 精品国产一区二区三区日日嗨 | 日韩精品一区二区三区免费观看视频 | 亚洲精品xx | 成人福利av | 在线精品视频免费播放 | 欧美精品久久人人躁人人爽 | 久久久久草 | 久久成人资源 | 国产福利91精品张津瑜 | 国产欧美精品一区二区三区 | 99电影456麻豆 | 五月激情av| 久久精品视频99 | 五月天激情综合网 | 精品福利在线 | 伊人夜夜 | 夜夜高潮夜夜爽国产伦精品 | 狠狠色伊人亚洲综合网站野外 | 国产不卡av在线播放 | 99久久精品免费看国产一区二区三区 | 超碰电影在线观看 | 久久九九精品 | 毛片在线播放网址 | 国内少妇自拍视频一区 | 国产免费又粗又猛又爽 | а中文在线天堂 | 人人玩人人弄 | 丁香色婷婷 | av免费在线观看网站 | 亚洲成年人在线播放 | 精品国产片 | 人人爽人人乐 | 国产精品18久久久久vr手机版特色 | 亚洲91精品 | 黄色软件在线观看视频 | 亚洲一级在线观看 | 免费在线观看毛片网站 | 国产一线二线三线性视频 | 蜜桃麻豆www久久囤产精品 | 国产剧情一区 | 菠萝菠萝在线精品视频 | 一级a性色生活片久久毛片波多野 | 国产一区二区三区四区大秀 | 丁香九月婷婷 | 中文字幕在线观看2018 | 久久久99精品免费观看乱色 | 国产精品久久久久久久久久新婚 | 久久99国产综合精品 | 欧美日韩在线看 | 精品一区二区三区四区在线 | 成人黄色一级视频 | 天天操夜夜爱 | 国产精品九九九九九 | 久久精品91视频 | 午夜日b视频 | 亚洲国产视频在线 | 在线视频免费观看 | 超碰在线天天 | av五月婷婷| 成人免费在线看片 | 久久激情小视频 | 国产精品免费观看久久 | 99国产在线观看 | 免费观看成人av | 日本黄色免费在线 | 精品久久久久久电影 | 日韩在线视频播放 | 九草视频在线观看 | 五月天综合色 | 亚洲精品福利在线 | 亚洲日本va在线观看 | 99精品观看| 欧美日韩高清一区二区三区 | 丝袜护士aⅴ在线白丝护士 天天综合精品 | 中文字幕在线观看91 | 久草久视频 | 日韩在线视频免费看 | 日本不卡一区二区三区在线观看 | 狠狠的操你 | 婷婷香蕉 | 日韩在线视频观看 | 久精品视频免费观看2 | 久久午夜电影网 | 国产午夜精品在线 | 在线观看中文字幕2021 | 99热999| 久久久久久久久久影视 | 国产精品自产拍在线观看蜜 | 最新av免费在线 | 色人久久| 91经典在线| 国产精品中文字幕在线 | 人人人爽 | 日韩av资源站 | 天天爽夜夜爽人人爽一区二区 | 亚洲经典视频 | 伊人伊成久久人综合网站 | 91九色老| 午夜精品电影 | 天天草天天插 | 久久亚洲免费 | 欧美精品久久久久久久久久丰满 | 8x成人免费视频 | 在线视频欧美精品 | 免费a v观看| 午夜精品999 | 九九在线视频免费观看 | 成人精品999 | 中文字幕在线视频一区二区三区 | 国产91精品一区二区麻豆亚洲 | 91麻豆精品国产91久久久使用方法 | 中文av免费| 久久国产精品精品国产色婷婷 | 丁香六月网 | 免费在线观看av网站 | 久热精品国产 | 国产精品99久久久久的智能播放 | 日本精品一 | 欧美精品黑人性xxxx | 天天干天天弄 | 亚洲日本一区二区在线 | 久久久国产精品亚洲一区 | 国产露脸91国语对白 | 日本久久电影网 | 一区二区三区四区五区在线 | 欧美另类交在线观看 | 欧美成年人在线观看 | 日日夜夜天天射 | 国际精品网 | 欧美亚洲久久 | 亚洲国产免费看 | 久久久久免费网 | 日韩资源在线播放 | 狠狠狠狠狠色综合 | www.xxxx变态.com| 人人添人人 | 国产精品久久久久久a | 国产精品免费大片视频 | 精品久久久久久亚洲综合网 | 天天做天天爱天天综合网 | 一区二区三区av在线 | 国产午夜精品理论片在线 | 日韩网站在线观看 | 日韩久久精品一区二区三区 | 日韩av网址在线 | 亚洲成aⅴ人片久久青草影院 | 人人看黄色 | 日本黄色免费在线 | 国产精品不卡av | 久草在线资源免费 | 91福利区一区二区三区 | 波多野结衣电影久久 | 日韩欧美国产精品 | 中文字幕日韩av | 99久久婷婷国产综合亚洲 | 欧美精品二区 | 2000xxx影视| 人人草在线观看 | 911国产精品| 日日久视频| 色99久久| 日韩电影中文字幕在线观看 | 欧美一区二区三区在线 | 激情五月激情综合网 | 在线精品在线 | 亚洲精选视频在线 | 香蕉视频久久 | 91 在线视频 | 久免费视频 | av黄色成人 | 国产 在线观看 | 久久久久久久久亚洲精品 | 精品一二三四五区 | 五月婷婷.com | 国产高清永久免费 | 国产精品 视频 | 亚洲精品综合一区二区 | 久久精品日本啪啪涩涩 | 97在线看| 久久综合九色综合久99 | 亚洲一区二区精品视频 | 中文字幕在线观看完整 | 国产精品成人久久久 | 午夜精品一区二区国产 | av福利在线看 | 亚洲蜜桃av | 亚洲精品国产欧美在线观看 | av性网站| 欧美日韩在线播放一区 | 91亚洲网| 91精品在线免费观看 | 91禁在线观看 | 成人免费一级 | 97精产国品一二三产区在线 | 日韩精品不卡 | 国产精品日韩精品 | 黄色成人av | 久久成人精品视频 | 久久久精品欧美 | 区一区二区三在线观看 | 亚洲国产激情 | 日韩av视屏在线观看 | 密桃av在线 | 精品久久免费看 | 成人中文字幕在线 | 欧美做受高潮1 | 天天干天天摸天天操 | 国内视频在线 | 91亚洲国产成人久久精品网站 | а天堂中文最新一区二区三区 | 69视频国产 | 91精品国产92久久久久 | 人人澡人摸人人添学生av | 欧美福利视频 | 国产99久久精品一区二区永久免费 | 玖玖精品在线 | 欧美一级小视频 | 国产精品视频免费看 | 99精品国产一区二区三区不卡 | 激情开心网站 | a久久久久久 | 99婷婷狠狠成为人免费视频 | 国产精品久久久久久麻豆一区 | 午夜视频久久久 | 日韩网| 免费视频资源 | 日韩欧美在线一区 | 国产 日韩 在线 亚洲 字幕 中文 | 国内精品久久久久国产 | 国内小视频 | a级片韩国 | 国产一区久久久 | 五月天婷亚洲天综合网精品偷 | 国产福利精品一区二区 | 狠狠色丁香婷婷综合久久片 | 免费精品久久久 | 99热.com | 国产精品欧美久久久久无广告 | 国产福利网站 | 免费看的黄色录像 | 久久福利剧场 | 日韩动态视频 | 久久久久国产免费免费 | 久久免费精彩视频 | 精品国产乱码久久久久久浪潮 | 在线国产91 | 久久综合婷婷国产二区高清 | 高潮久久久久久久久 | 亚洲综合色播 | adc在线观看 | 国内精品一区二区 | 永久黄网站色视频免费观看w | 亚洲国产成人在线播放 | 在线观看国产麻豆 | 日韩免费电影网站 | 中文字幕 第二区 | 色视频网站免费观看 | 99久久精品免费看国产麻豆 | 欧美一区二区在线刺激视频 | 国语黄色片 | 激情五月在线 | 久久这里只有精品23 | 国产精品久久 | 婷婷久月 | 天堂av网站| 波多野结衣精品 | 黄色精品在线看 | 天天碰天天操视频 | 91亚洲精品久久久蜜桃网站 | 国产精品一级在线 | a视频在线 | 日韩在线短视频 | 999久久久久久 | 国产在线观看二区 | 国产精品麻豆果冻传媒在线播放 | 日韩激情av在线 | 天天操天天摸天天射 | av成人在线网站 | 玖玖爱在线观看 | 久99久精品 | 亚洲精品视频在线免费播放 | www久久久| 超碰在线免费福利 | 国产成人av一区二区三区在线观看 | 国产精品四虎 | 日韩网站中文字幕 | 我要色综合天天 | 精品视频中文字幕 | 999久久久久 | 欧美精品久久 | 久久天天躁夜夜躁狠狠躁2022 | 亚洲综合激情 | 午夜美女wwww | 丁香激情综合国产 | 在线视频 你懂得 | 国产精品原创视频 | 日本不卡一区二区 | 亚洲精品影院在线观看 | 日韩精品不卡在线 | 这里只有精品视频在线观看 | 麻豆国产在线播放 | 亚洲精品视频在线观看免费视频 | 国产在线精品福利 | 91在线网址| 久久任你操 | 伊人色综合久久天天网 | 91免费在线| 91激情在线视频 | 日韩簧片在线观看 | 久久精品视频99 | 在线观看爱爱视频 | 日本性生活免费看 | 天堂在线视频中文网 | 国产主播大尺度精品福利免费 | a在线播放 | 色一级片 | 国产精品麻豆视频 | 久久中文字幕视频 | 久久九九免费视频 | 特黄特色特刺激视频免费播放 | 日本精品视频一区 | www.在线看片.com | 国产精品久一 | 亚洲2019精品 | 中文字幕在线观看一区二区三区 | 精品视频成人 | 国产伦精品一区二区三区无广告 | av电影中文字幕 | 97在线观看免费高清 | 视频二区在线 | 五月婷婷.com | 国产破处精品 | 天堂网在线视频 | 国产精品久久久久久久久久妇女 | 狠狠婷婷 | 在线看国产一区 | 2023av| 国产群p | 亚洲精品国产高清 | 亚洲精品字幕在线 | 黄色三级免费观看 | 在线亚洲小视频 | 一区二区三区视频在线 | 亚洲成av | 欧美激情视频一二三区 | 狠狠色噜噜狠狠狠狠2022 | 国产精品视频999 | 九九视频免费在线观看 | 亚洲一级性 | 在线观看日本高清mv视频 | 狠狠插狠狠干 | 狠狠操狠狠干天天操 | 日本中文字幕在线观看 | 亚洲国产日韩在线 | 九色精品免费永久在线 | av丝袜在线 | 91最新视频在线观看 | 中文字幕乱视频 | 2021国产精品视频 | 97**国产露脸精品国产 | 深爱开心激情 | 亚洲精品乱码久久久一二三 | 久久免费视频2 | 久久久午夜电影 | 天天夜夜操 | 日韩欧美精品一区二区三区经典 | 国产一区免费视频 | 久久综合九色综合97婷婷女人 | 毛片网在线播放 | 日韩欧美视频 | 欧美日韩1区 | 中文在线 | 狠狠黄 | 国产一区二区在线免费 | 天天操天| 国产精品理论片在线播放 | 美女黄频| 欧美淫aaa免费观看 日韩激情免费视频 | 婷婷视频在线播放 | 国产97视频在线 | 91视频免费看片 | 久久免费视频这里只有精品 | 国产精品免费观看国产网曝瓜 | 成年人免费在线 | 久久综合免费视频影院 | 国产不卡av在线播放 | 狠狠久久综合 | 制服丝袜天堂 | 亚洲精品视频在线免费 | 亚洲国产精品一区二区尤物区 | 91麻豆国产福利在线观看 | 五月天九九 | 国产精品a级 | 黄色毛片视频 | 亚洲成人欧美 | 国产伦理久久精品久久久久_ | 欧美va天堂在线电影 | 在线电影 一区 | 99国产视频 | 久热电影 | 深爱婷婷 | 激情网在线观看 | 色资源网免费观看视频 | 色就干| 国产精品久久久久久久久大全 | 91九色视频 | 亚洲成人国产精品 | 伊人中文在线 | 亚州日韩中文字幕 | 91看片网址 | 色老板在线视频 | 中文字幕在线播放一区 | 亚洲专区在线播放 | 最新日韩电影 | 视频在线观看入口黄最新永久免费国产 | 国产99视频在线观看 | 五月天伊人网 | 国内精品视频在线 | 日本久久片 | 奇米影视777四色米奇影院 | 亚洲视频电影在线 | 天天插日日插 | 99在线看 | 天天射天天干天天插 | a√天堂中文在线 | 中文字幕2021 | 久草资源在线 | 又长又大又黑又粗欧美 | 久久精品7| 亚洲视频2 | 国产精品中文字幕av | 精品国产三级a∨在线欧美 免费一级片在线观看 | 国产一区二区成人 | 亚洲精品视频久久 | 日韩在线电影 | 成人久久久久久久久久 | 国产中出在线观看 | 99爱精品视频 | 日韩精品久久一区二区三区 | 在线观看mv的中文字幕网站 | 丁香导航 | av福利第一导航 | 日本午夜免费福利视频 | 特级西西444www大胆高清无视频 | 欧美精品生活片 | 国产麻豆视频免费观看 | 99免在线观看免费视频高清 | 最新婷婷色 | 国产午夜影院 | 国产精品系列在线播放 | 激情综合色图 | av在线影片 | 国产群p视频 | 成人av免费网站 | 久草在线资源网 | 色吊丝在线永久观看最新版本 | 日本公妇色中文字幕 | 婷婷精品视频 | 成人在线你懂得 | 久久精品99国产精品亚洲最刺激 | 9999毛片 | 国产亚洲视频在线 | 日韩精品一区二区在线观看视频 | 久久免费精品一区二区三区 | 五月婷香| 粉嫩高清一区二区三区 | 午夜三级影院 | 国产精品久久久久久久久久久免费看 | 色视频在线免费观看 | 久久一区91 | 在线a亚洲视频播放在线观看 | 亚洲精品自拍视频在线观看 | 99精品乱码国产在线观看 | 国产1级视频 | 日本精品久久久久影院 | 欧美韩国日本在线观看 | 精品国产一区二区三区男人吃奶 | 日韩高清一二区 | 日韩精品一区二区在线视频 | 欧美综合久久 | 天天干天天射天天爽 | 久久免费毛片 | 五月婷在线观看 | 99热在线国产 | 日韩视频图片 | 9797在线看片亚洲精品 | 日韩精品电影在线播放 | 日韩精品不卡在线 | 91成年人在线观看 | 国产91学生粉嫩喷水 | 久99久精品视频免费观看 | 九九亚洲精品 | 777xxx欧美 | 国产精品手机在线 | 日本三级中文字幕在线观看 | 欧美在线视频日韩 | 亚洲综合激情小说 | 午夜视频一区二区 | 在线观看久草 | 免费av在线播放 | 免费观看xxxx9999片 | 久久久久色 | 日日爽天天 | 精品久久久久久亚洲综合网站 | 久草成人在线 | 欧美成人精品欧美一级乱黄 | 一区三区视频在线观看 | 久久国产手机看片 | 最近日本字幕mv免费观看在线 | 在线观看爱爱视频 | 亚洲一二三在线 | 成人av一级片 | 天堂视频中文在线 | 黄色av网站在线观看 | 欧美日韩国产免费视频 | 久久久久久久99 | 婷婷色综 | 高清免费在线视频 | 天天超碰 | 国产一区二区高清不卡 | 成人三级网站在线观看 | 日本激情视频中文字幕 | 成人午夜性影院 | 精品久久久久免费极品大片 | 免费在线观看黄 | 99亚洲精品在线 | 亚洲伊人婷婷 | 日韩综合第一页 | 天天拍天天色 | 丁香婷婷色综合亚洲电影 | 久久国产欧美日韩精品 | 国产亚洲欧美精品久久久久久 | 婷婷激情综合五月天 | 欧美久久久久久久久中文字幕 | 成人性生活大片 | 99精品视频免费看 | 亚洲区色| 国产在线97 | 91免费看片黄 | 久久国产视屏 | 人人澡人人舔 | 超碰97在线看 | 日韩高清不卡一区二区三区 | 99在线视频观看 | 91天堂素人约啪 | 国产成人99av超碰超爽 | 欧美成人基地 | 3d黄动漫免费看 | 色噜噜日韩精品一区二区三区视频 | 国产色在线观看 | 久草视频免费在线播放 | 国产专区一 | 久艹在线观看视频 | 66av99精品福利视频在线 | 99色人 | 久久精品综合 | 亚洲成人欧美 | 国产精品国产三级国产不产一地 | 久草在线免费看视频 | 日韩成人一级大片 | 天天激情站 | www91在线观看 | 久久久麻豆 | 久久精品播放 | 97精品一区 | 亚洲精品中文在线观看 | 国产精品毛片一区二区在线 | 欧美在线视频一区二区三区 | 一区二区视频免费在线观看 | 日本久久久久久久久 | 国产片免费在线观看视频 | 日韩欧美区 | 亚洲精品电影在线 | 国产精品欧美久久久久天天影视 | 精品欧美一区二区精品久久 | 国产精品一区二区美女视频免费看 | 成人av片在线观看 | 99精品久久久久 | 69av在线视频 | 欧美一级片免费在线观看 | 在线视频成人 | 亚洲国产精品成人精品 | 欧美日韩精品在线 | av东方在线| 97麻豆视频 | 精品久久久一区二区 | 欧美一级视频免费 | 欧美精品亚洲二区 | av一区二区三区在线播放 | 精品亚洲免费 | 国产亚洲欧美一区 | 在线亚洲日本 | www.狠狠色 | 婷婷五月在线视频 | 少妇自拍av | 国产又粗又猛又色又黄网站 | 成人网444ppp| av电影在线不卡 | 亚洲视频网站在线观看 | 91成人破解版 | 国产一区二区日本 | 久久久久亚洲a | 日韩免费不卡视频 | 国产精品麻豆果冻传媒在线播放 | 成人一级黄色片 | 97福利视频| 日韩黄在线观看 | 国产在线视频在线观看 | 天天综合色网 | 日韩在线电影一区二区 | 91女人18片女毛片60分钟 | 亚洲精品资源 | 91香蕉视频在线下载 | 天天色天天色 | 亚洲国产美女精品久久久久∴ | 91少妇精拍在线播放 | 日韩欧美高清一区二区 | 国产精品久久久久久妇 | 在线观看日韩精品 | 玖玖爱国产在线 | 中文字幕在线观看第二页 | 一区二区三区动漫 | 婷婷丁香社区 | 婷色在线 | 亚洲精区二区三区四区麻豆 | 综合色狠狠| av九九九| 欧美日韩一区二区三区不卡 | 黄色动态图xx | 四虎影视精品永久在线观看 | 成人久久18免费网站图片 | 精品欧美乱码久久久久久 | 久久久免费视频播放 | 99精品在线免费观看 | 中文字幕网址 | 国产精品电影一区二区 | 国产一级片网站 | 成年人视频免费在线 | 在线看国产| 国产99久久久国产精品免费看 | 天天爱天天 | 国产精品成人自产拍在线观看 | av高清一区二区三区 | 色播99 | 久久成年人网站 | 91插插影库| 国产69精品久久久久久 | 国产精品你懂的在线观看 | 97超碰免费 | 国产一区二区在线观看视频 | 久久成人综合视频 | 久久99九九99精品 | 91久久久久久久一区二区 | 日日天天av | 黄色三级网站在线观看 | 婷婷深爱网 | 成人国产综合 | 久久久99国产精品免费 | 国产精品不卡在线观看 | 久久久久黄 | 欧美日韩亚洲在线观看 | 免费国产视频 | 国产视频欧美视频 | 在线观看中文字幕av | 成人午夜网 | 精品免费久久 | 国产精品99久久久久人中文网介绍 | 操操操综合 | 天天色天天草天天射 | 久久精品中文视频 | 亚洲专区在线视频 | 狠狠久久综合 | 国产精品久久久久久妇 | 激情久久一区二区三区 | 国内精品视频一区二区三区八戒 | 午夜av在线播放 | 亚州性色 | 日韩免 | 操少妇视频 | 日韩av免费一区二区 | 精品久久1 | 欧美国产日韩在线观看 | 精品成人网 | 久久久91精品国产一区二区三区 | 久久精品免费电影 | 中文字幕在线观看2018 | 日韩高清观看 | 在线观看国产区 | 综合色爱| 免费视频色 | 亚洲亚洲精品在线观看 | 久久久麻豆视频 | 97超碰人人澡| 国产高清视频免费最新在线 | 国产在线精品一区二区不卡了 | 成人xxxx | 日韩 在线观看 | 久久久久久久久久久免费av | 日韩中字在线观看 | 夜夜爽天天爽 | 999久久| 国产网站av | 久久久久久久久久久免费av | 亚洲网站在线看 | av电影在线观看 | 久久久久久久久久久福利 | 九九综合久久 | 欧美精品久久久久久久久久丰满 | 人人要人人澡人人爽人人dvd |