EMR StarRocks 极速数据湖分析原理解析
簡介:數(shù)據(jù)湖概念日益火熱,本文由阿里云開源大數(shù)據(jù) OLAP 團(tuán)隊和 StarRocks 數(shù)據(jù)湖分析團(tuán)隊共同為大家介紹“ StarRocks 極速數(shù)據(jù)湖分析 ”背后的原理。 【首月99元】EMR StarRocks 數(shù)據(jù)湖極速分析體驗,試用火熱進(jìn)行中,快來申請吧 -> https://survey.aliyun.com/apps/zhiliao/Yns9d9Xxz
StarRocks 是一個強(qiáng)大的數(shù)據(jù)分析系統(tǒng),主要宗旨是為用戶提供極速、統(tǒng)一并且易用的數(shù)據(jù)分析能力,以幫助用戶通過更小的使用成本來更快的洞察數(shù)據(jù)的價值。通過精簡的架構(gòu)、高效的向量化引擎以及全新設(shè)計的基于成本的優(yōu)化器(CBO),StarRocks 的分析性能(尤其是多表 JOIN 查詢)得以遠(yuǎn)超同類產(chǎn)品。
為了能夠滿足更多用戶對于極速分析數(shù)據(jù)的需求,同時讓 StarRocks 強(qiáng)大的分析能力應(yīng)用在更加廣泛的數(shù)據(jù)集上,阿里云開源大數(shù)據(jù) OLAP 團(tuán)隊聯(lián)合社區(qū)一起增強(qiáng) StarRocks的數(shù)據(jù)湖分析能力。使其不僅能夠分析存儲在 StarRocks 本地的數(shù)據(jù),還能夠以同樣出色的表現(xiàn)分析存儲在 Apache Hive、Apache Iceberg 和 Apache Hudi 等開源數(shù)據(jù)湖或數(shù)據(jù)倉庫的數(shù)據(jù)。
本文將重點(diǎn)介紹 StarRocks 極速數(shù)據(jù)湖分析能力背后的技術(shù)內(nèi)幕,性能表現(xiàn)以及未來的規(guī)劃。
一、整體架構(gòu)
在數(shù)據(jù)湖分析的場景中,StarRocks 主要負(fù)責(zé)數(shù)據(jù)的計算分析,而數(shù)據(jù)湖則主要負(fù)責(zé)數(shù)據(jù)的存儲、組織和維護(hù)。上圖描繪了由 StarRocks 和數(shù)據(jù)湖所構(gòu)成的完成的技術(shù)棧。
StarRocks 的架構(gòu)非常簡潔,整個系統(tǒng)的核心只有 FE(Frontend)、BE(Backend)兩類進(jìn)程,不依賴任何外部組件,方便部署與維護(hù)。其中 FE 主要負(fù)責(zé)解析查詢語句(SQL),優(yōu)化查詢以及查詢的調(diào)度,而 BE 則主要負(fù)責(zé)從數(shù)據(jù)湖中讀取數(shù)據(jù),并完成一系列的 Filter 和 Aggregate 等操作。
數(shù)據(jù)湖本身是一類技術(shù)概念的集合,常見的數(shù)據(jù)湖通常包含 Table Format、File Format 和 Storage 三大模塊。其中 Table Format 是數(shù)據(jù)湖的“UI”,其主要作用是組織結(jié)構(gòu)化、半結(jié)構(gòu)化,甚至是非結(jié)構(gòu)化的數(shù)據(jù),使其得以存儲在像 HDFS 這樣的分布式文件系統(tǒng)或者像 OSS 和 S3 這樣的對象存儲中,并且對外暴露表結(jié)構(gòu)的相關(guān)語義。Table Format 包含兩大流派,一種是將元數(shù)據(jù)組織成一系列文件,并同實際數(shù)據(jù)一同存儲在分布式文件系統(tǒng)或?qū)ο蟠鎯χ?#xff0c;例如 Apache Iceberg、Apache Hudi 和 Delta Lake 都屬于這種方式;還有一種是使用定制的 metadata service 來單獨(dú)存放元數(shù)據(jù),例如 StarRocks 本地表,Snowflake 和 Apache Hive 都是這種方式。
File Format 的主要作用是給數(shù)據(jù)單元提供一種便于高效檢索和高效壓縮的表達(dá)方式,目前常見的開源文件格式有列式的 Apache Parquet 和 Apache ORC,行式的 Apache Avro 等。
Storage 是數(shù)據(jù)湖存儲數(shù)據(jù)的模塊,目前數(shù)據(jù)湖最常使用的 Storage 主要是分布式文件系統(tǒng) HDFS,對象存儲 OSS 和 S3 等。
FE
FE 的主要作用將 SQL 語句轉(zhuǎn)換成 BE 能夠認(rèn)識的 Fragment,如果把 BE 集群當(dāng)成一個分布式的線程池的話,那么 Fragment 就是線程池中的 Task。從 SQL 文本到分布式物理執(zhí)行計劃,FE 的主要工作需要經(jīng)過以下幾個步驟:
- SQL Parse: 將 SQL 文本轉(zhuǎn)換成一個 AST(抽象語法樹)
- SQL Analyze:基于 AST 進(jìn)行語法和語義分析
- SQL Logical Plan: 將 AST 轉(zhuǎn)換成邏輯計劃
- SQL Optimize:基于關(guān)系代數(shù),統(tǒng)計信息,Cost 模型對 邏輯計劃進(jìn)行重寫,轉(zhuǎn)換,選擇出 Cost “最低” 的物理執(zhí)行計劃
- 生成 Plan Fragment:將 Optimizer 選擇的物理執(zhí)行計劃轉(zhuǎn)換為 BE 可以直接執(zhí)行的 Plan Fragment。
- 執(zhí)行計劃的調(diào)度
BE
Backend 是 StarRocks 的后端節(jié)點(diǎn),負(fù)責(zé)數(shù)據(jù)存儲以及 SQL 計算執(zhí)行等工作。
StarRocks 的 BE 節(jié)點(diǎn)都是完全對等的,FE 按照一定策略將數(shù)據(jù)分配到對應(yīng)的 BE 節(jié)點(diǎn)。在數(shù)據(jù)導(dǎo)入時,數(shù)據(jù)會直接寫入到 BE 節(jié)點(diǎn),不會通過FE中轉(zhuǎn),BE 負(fù)責(zé)將導(dǎo)入數(shù)據(jù)寫成對應(yīng)的格式以及生成相關(guān)索引。在執(zhí)行 SQL 計算時,一條 SQL 語句首先會按照具體的語義規(guī)劃成邏輯執(zhí)行單元,然后再按照數(shù)據(jù)的分布情況拆分成具體的物理執(zhí)行單元。物理執(zhí)行單元會在數(shù)據(jù)存儲的節(jié)點(diǎn)上進(jìn)行執(zhí)行,這樣可以避免數(shù)據(jù)的傳輸與拷貝,從而能夠得到極致的查詢性能。
二、技術(shù)細(xì)節(jié)
StarRocks 為什么這么快
CBO 優(yōu)化器
一般 SQL 越復(fù)雜,Join 的表越多,數(shù)據(jù)量越大,查詢優(yōu)化器的意義就越大,因為不同執(zhí)行方式的性能差別可能有成百上千倍。StarRocks 優(yōu)化器主要基于 Cascades 和 ORCA 論文實現(xiàn),并結(jié)合 StarRocks 執(zhí)行器和調(diào)度器進(jìn)行了深度定制,優(yōu)化和創(chuàng)新。完整支持了 TPC-DS 99 條 SQL,實現(xiàn)了公共表達(dá)式復(fù)用,相關(guān)子查詢重寫,Lateral Join, CTE ?復(fù)用,Join Rorder,Join 分布式執(zhí)行策略選擇,Runtime Filter 下推,低基數(shù)字典優(yōu)化 等重要功能和優(yōu)化。
CBO 優(yōu)化器好壞的關(guān)鍵之一是 Cost 估計是否準(zhǔn)確,而 Cost 估計是否準(zhǔn)確的關(guān)鍵點(diǎn)之一是統(tǒng)計信息是否收集及時,準(zhǔn)確。 StarRocks 目前支持表級別和列級別的統(tǒng)計信息,支持自動收集和手動收集兩種方式,無論自動還是手動,都支持全量和抽樣收集兩種方式。
MPP 執(zhí)行
MPP (massively parallel processing) 是大規(guī)模并行計算的簡稱,核心做法是將查詢 Plan 拆分成很多可以在單個節(jié)點(diǎn)上執(zhí)行的計算實例,然后多個節(jié)點(diǎn)并行執(zhí)行。 每個節(jié)點(diǎn)不共享 CPU,內(nèi)存, 磁盤資源。MPP 數(shù)據(jù)庫的查詢性能可以隨著集群的水平擴(kuò)展而不斷提升。
如上圖所示,StarRocks 會將一個查詢在邏輯上切分為多個 Query Fragment(查詢片段),每個 Query Fragment 可以有一個或者多個 Fragment 執(zhí)行實例,每個Fragment 執(zhí)行實例 會被調(diào)度到集群某個 BE 上執(zhí)行。 如上圖所示,一個 Fragment 可以包括 一個 或者多個 Operator(執(zhí)行算子),圖中的 Fragment 包括了 Scan, Filter, Aggregate。如上圖所示,每個 Fragment 可以有不同的并行度。
如上圖所示,多個 Fragment 之間會以 Pipeline 的方式在內(nèi)存中并行執(zhí)行,而不是像批處理引擎那樣 Stage By Stage 執(zhí)行。
如上圖所示,Shuffle (數(shù)據(jù)重分布)操作是 MPP 數(shù)據(jù)庫查詢性能可以隨著集群的水平擴(kuò)展而不斷提升的關(guān)鍵,也是實現(xiàn)高基數(shù)聚合和大表 Join 的關(guān)鍵。
向量化執(zhí)行引擎
隨著數(shù)據(jù)庫執(zhí)行的瓶頸逐漸從 IO 轉(zhuǎn)移到 CPU,為了充分發(fā)揮 CPU 的執(zhí)行性能,StarRocks 基于向量化技術(shù)重新實現(xiàn)了整個執(zhí)行引擎。 算子和表達(dá)式向量化執(zhí)行的核心是批量按列執(zhí)行,批量執(zhí)行,相比與單行執(zhí)行,可以有更少的虛函數(shù)調(diào)用,更少的分支判斷;按列執(zhí)行,相比于按行執(zhí)行,對 CPU Cache 更友好,更易于 SIMD 優(yōu)化。
向量化執(zhí)行不僅僅是數(shù)據(jù)庫所有算子的向量化和表達(dá)式的向量化,而是一項巨大和復(fù)雜的性能優(yōu)化工程,包括數(shù)據(jù)在磁盤,內(nèi)存,網(wǎng)絡(luò)中的按列組織,數(shù)據(jù)結(jié)構(gòu)和算法的重新設(shè)計,內(nèi)存管理的重新設(shè)計,SIMD 指令優(yōu)化,CPU Cache 優(yōu)化,C++優(yōu)化等。向量化執(zhí)行相比之前的按行執(zhí)行,整體性能提升了5到10倍。
StarRocks 如何優(yōu)化數(shù)據(jù)湖分析
大數(shù)據(jù)分析領(lǐng)域,數(shù)據(jù)除了存儲在數(shù)倉之外,也會存儲在數(shù)據(jù)湖當(dāng)中,傳統(tǒng)的數(shù)據(jù)湖實現(xiàn)方案包括 Hive/HDFS。近幾年比較火熱的是 LakeHouse 概念,常見的實現(xiàn)方案包括 Iceberg/Hudi/Delta。那么 StarRocks 能否幫助用戶更好地挖掘數(shù)據(jù)湖中的數(shù)據(jù)價值呢?答案是肯定的。
在前面的內(nèi)容中我們介紹了 StarRocks 如何實現(xiàn)極速分析,如果將這些能力用于數(shù)據(jù)湖肯定會帶來更好地數(shù)據(jù)湖分析體驗。在這部分內(nèi)容中,我們會介紹 StarRocks 是如何實現(xiàn)極速數(shù)據(jù)湖分析的。
我們先看一下全局的架構(gòu),StarRocks 和數(shù)據(jù)湖分析相關(guān)的主要幾個模塊如下圖所示。其中 Data Management 由數(shù)據(jù)湖提供,Data Storage 由對象存儲 OSS/S3,或者是分布式文件系統(tǒng) HDFS 提供。
目前,StarRocks 已經(jīng)支持的數(shù)據(jù)湖分析能力可以歸納為下面幾個部分:
- 支持 Iceberg v1 表查詢 https://github.com/StarRocks/starrocks/issues/1030
- 支持 Hive 外表查詢 外部表 @ External_table @ StarRocks Docs (dorisdb.com)
- 支持 Hudi COW 表查詢 https://github.com/StarRocks/starrocks/issues/2772
接下來我們從查詢優(yōu)化和查詢執(zhí)行這幾個方面來看一下,StarRocks 是如何實現(xiàn)將極速分析的能力賦予數(shù)據(jù)湖的。
查詢優(yōu)化
查詢優(yōu)化這部分主要是利用前面介紹的 CBO 優(yōu)化器來實現(xiàn),數(shù)據(jù)湖模塊需要給優(yōu)化器統(tǒng)計信息。基于這些統(tǒng)計信息,優(yōu)化器會利用一系列策略來實現(xiàn)查詢執(zhí)行計劃的最優(yōu)化。接下來我們通過例子看一下幾個常見的策略。
統(tǒng)計信息
我們看下面這個例子,生成的執(zhí)行計劃中,HdfsScanNode 包含了 cardunality、avgRowSize 等統(tǒng)計信息的展示。
MySQL [hive_test]> explain select l_quantity from lineitem; +-----------------------------+ | Explain String | +-----------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:5: l_quantity | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:HdfsScanNode | | TABLE: lineitem | | partitions=1/1 | | cardinality=126059930 | | avgRowSize=8.0 | | numNodes=0 | +-----------------------------+在正式進(jìn)入到 CBO 優(yōu)化器之前,這些統(tǒng)計信息都會計算好。比如針對 Hive 我們有 MetaData Cache 來緩存這些信息,針對 Iceberg 我們通過 Iceberg 的 manifest 信息來計算這些統(tǒng)計信息。獲取到這些統(tǒng)計信息之后,對于后續(xù)的優(yōu)化策略的效果有很大地提升。
分區(qū)裁剪
分區(qū)裁剪是只有當(dāng)目標(biāo)表為分區(qū)表時,才可以進(jìn)行的一種優(yōu)化方式。分區(qū)裁剪通過分析查詢語句中的過濾條件,只選擇可能滿足條件的分區(qū),不掃描匹配不上的分區(qū),進(jìn)而顯著地減少計算的數(shù)據(jù)量。比如下面的例子,我們創(chuàng)建了一個以 ss_sold_date_sk 為分區(qū)列的外表。
create external table store_sales( ss_sold_time_sk bigint , ss_item_sk bigint , ss_customer_sk bigint , ss_coupon_amt decimal(7,2) , ss_net_paid decimal(7,2) , ss_net_paid_inc_tax decimal(7,2) , ss_net_profit decimal(7,2) , ss_sold_date_sk bigint ) ENGINE=HIVE PROPERTIES ( "resource" = "hive_tpcds", "database" = "tpcds", "table" = "store_sales" );在執(zhí)行如下查詢的時候,分區(qū)2451911和2451941之間的數(shù)據(jù)才會被讀取,其他分區(qū)的數(shù)據(jù)會被過濾掉,這可以節(jié)約很大一部分的網(wǎng)絡(luò) IO 的消耗。
select ss_sold_time_sk from store_sales where ss_sold_date_sk between 2451911 and 2451941 order ss_sold_time_sk;Join Reorder
多個表的 Join 的查詢效率和各個表參與 Join 的順序有很大關(guān)系。如 select * from T0, T1, T2 where T0.a=T1.a and T2.a=T1.a,這個 SQL 中可能的執(zhí)行順序有下面兩種情況:
- T0 和 T1 先做 Join,然后再和 T2 做 Join
- T1 和 T2 先做 Join,然后再和 T0 做 Join
根據(jù) T0 和 T2 的數(shù)據(jù)量及數(shù)據(jù)分布,這兩種執(zhí)行順序會有不同的性能表現(xiàn)。針對這個情況,StarRocks 在優(yōu)化器中實現(xiàn)了基于 DP 和貪心的 Join Reorder 機(jī)制。目前針對 Hive的數(shù)據(jù)分析,已經(jīng)支持了 Join Reorder,其他的數(shù)據(jù)源的支持也正在開發(fā)中。下面是一個例子:
MySQL [hive_test]> explain select * from T0, T1, T2 where T2.str=T0.str and T1.str=T0.str; +----------------------------------------------+ | Explain String | +----------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: str | 2: str | 3: str | | PARTITION: UNPARTITIONED | | RESULT SINK | | 8:EXCHANGE | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: HASH_PARTITIONED: 2: str | | STREAM DATA SINK | | EXCHANGE ID: 08 | | UNPARTITIONED | | 7:HASH JOIN | | | join op: INNER JOIN (BUCKET_SHUFFLE(S)) | | | hash predicates: | | | colocate: false, reason: | | | equal join conjunct: 1: str = 3: str | | |----6:EXCHANGE | | 4:HASH JOIN | | | join op: INNER JOIN (PARTITIONED) | | | hash predicates: | | | colocate: false, reason: | | | equal join conjunct: 2: str = 1: str | | |----3:EXCHANGE | | 1:EXCHANGE | | PLAN FRAGMENT 2 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 06 | | HASH_PARTITIONED: 3: str | | 5:HdfsScanNode | | TABLE: T2 | | partitions=1/1 | | cardinality=1 | | avgRowSize=16.0 | | numNodes=0 | | PLAN FRAGMENT 3 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: 1: str | | 2:HdfsScanNode | | TABLE: T0 | | partitions=1/1 | | cardinality=1 | | avgRowSize=16.0 | | numNodes=0 | | PLAN FRAGMENT 4 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 01 | | HASH_PARTITIONED: 2: str | | 0:HdfsScanNode | | TABLE: T1 | | partitions=1/1 | | cardinality=1 | | avgRowSize=16.0 | | numNodes=0 | +----------------------------------------------+謂詞下推
謂詞下推將查詢語句中的過濾表達(dá)式計算盡可能下推到距離數(shù)據(jù)源最近的地方,從而減少數(shù)據(jù)傳輸或計算的開銷。針對數(shù)據(jù)湖場景,我們實現(xiàn)了將 Min/Max 等過濾條件下推到 Parquet 中,在讀取 Parquet 文件的時候,能夠快速地過濾掉不用的 Row Group。
比如,對于下面的查詢,l_discount=1對應(yīng)條件會下推到 Parquet 側(cè)。
MySQL [hive_test]> explain select l_quantity from lineitem where l_discount=1; +----------------------------------------------------+ | Explain String | +----------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:5: l_quantity | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:Project | | | <slot 5> : 5: l_quantity | | | | | 0:HdfsScanNode | | TABLE: lineitem | | NON-PARTITION PREDICATES: 7: l_discount = 1.0 | | partitions=1/1 | | cardinality=63029965 | | avgRowSize=16.0 | | numNodes=0 | +----------------------------------------------------+其他策略
除了上面介紹的幾種策略,針對數(shù)據(jù)湖分析,我們還適配了如 Limit 下推、TopN 下推、子查詢優(yōu)化等策略。能夠進(jìn)一步地優(yōu)化查詢性能。
查詢執(zhí)行
前面介紹了,StarRocks 的執(zhí)行引擎是全向量化、MPP 架構(gòu)的,這些無疑都會給我們分析數(shù)據(jù)湖的數(shù)據(jù)帶來很大提升。接下來我們看一下 StarRocks 是如何調(diào)度和執(zhí)行數(shù)據(jù)湖分析查詢的。
查詢調(diào)度
數(shù)據(jù)湖的數(shù)據(jù)一般都存儲在如 HDFS、OSS 上,考慮到混部和非混部的情況。我們對 Fragment 的調(diào)度,實現(xiàn)了一套負(fù)載均衡的算法。
- 做完分區(qū)裁剪之后,得到要查詢的所有 HDFS 文件 block
- 對每個 block 構(gòu)造 THdfsScanRange,其中 hosts 包含 block 所有副本所在的 datanode 地址,最終得到 List
- Coordinator 維護(hù)一個所有 be 當(dāng)前已經(jīng)分配的 scan range 數(shù)目的 map,每個 datanode 上磁盤已分配的要讀取 block 的數(shù)目的 map>,及每個 be 平均分配的 scan range 數(shù)目 numScanRangePerBe
- 如果 block 副本所在的 datanode 有be(混部)
- 每個 scan range 優(yōu)先分配給副本所在的 be 中 scan range 數(shù)目最少的 be。如果 be 已經(jīng)分配的 scan range 數(shù)目大于 numScanRangePerBe,則從遠(yuǎn)程 be 中選擇 scan range 數(shù)目最小的
- 如果有多個 be 上 scan range 數(shù)目一樣小,則考慮 be 上磁盤的情況,選擇副本所在磁盤上已分配的要讀取 block 數(shù)目小的 be
- 如果 block 副本所在的 datanode 機(jī)器沒有 be(單獨(dú)部署或者可以遠(yuǎn)程讀)
- 選擇 scan range 數(shù)目最小的 be
查詢執(zhí)行
在調(diào)度到 BE 端進(jìn)行執(zhí)行之后,整個執(zhí)行過程都是向量化的。具體看下面 Iceberg 的例子,IcebergScanNode 對應(yīng)的 BE 端目前是 HdfsScanNode 的向量化實現(xiàn),其他算子也是類似,在 BE 端都是向量化的實現(xiàn)。
MySQL [external_db_snappy_yuzhou]> explain select c_customer_id customer_id -> ,c_first_name customer_first_name -> ,c_last_name customer_last_name -> ,c_preferred_cust_flag customer_preferred_cust_flag -> ,c_birth_country customer_birth_country -> ,c_login customer_login -> ,c_email_address customer_email_address -> ,d_year dyear -> ,'s' sale_type -> from customer, store_sales, date_dim -> where c_customer_sk = ss_customer_sk -> and ss_sold_date_sk = d_date_sk; +------------------------------------------------ | PLAN FRAGMENT 0 | OUTPUT EXPRS:2: c_customer_id | 9: c_first_name | 10: c_last_name | 11: c_preferred_cust_flag | 15: c_birth_country | 16: c_login | 17: c_email_address | 48: d_year | 70: expr | | PARTITION: UNPARTITIONED | RESULT SINK | 9:EXCHANGE | PLAN FRAGMENT 1 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 09 | UNPARTITIONED | 8:Project | | <slot 2> : 2: c_customer_id | | <slot 9> : 9: c_first_name | | <slot 10> : 10: c_last_name | | <slot 11> : 11: c_preferred_cust_flag | | <slot 15> : 15: c_birth_country | | <slot 16> : 16: c_login | | <slot 17> : 17: c_email_address | | <slot 48> : 48: d_year | | <slot 70> : 's' | 7:HASH JOIN | | join op: INNER JOIN (BROADCAST) | | hash predicates: | | colocate: false, reason: | | equal join conjunct: 21: ss_customer_sk = 1: c_customer_sk | 4:Project | | <slot 21> : 21: ss_customer_sk | | <slot 48> : 48: d_year | 3:HASH JOIN | | join op: INNER JOIN (BROADCAST) | | hash predicates: | | colocate: false, reason: | | equal join conjunct: 41: ss_sold_date_sk = 42: d_date_sk | 0:IcebergScanNode | TABLE: store_sales | cardinality=28800991 | avgRowSize=1.4884362 | numNodes=0 | PLAN FRAGMENT 2 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 06 | UNPARTITIONED | 5:IcebergScanNode | TABLE: customer | cardinality=500000 | avgRowSize=36.93911 | numNodes=0 | PLAN FRAGMENT 3 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 02 | UNPARTITIONED | 1:IcebergScanNode | TABLE: date_dim | cardinality=73049 | avgRowSize=4.026941 | numNodes=0三、基準(zhǔn)測試
TPC-H 是美國交易處理效能委員會TPC(Transaction Processing Performance Council)組織制定的用來模擬決策支持類應(yīng)用的測試集。 It consists of a suite of business oriented ad-hoc queries and concurrent data modifications.
TPC-H 根據(jù)真實的生產(chǎn)運(yùn)行環(huán)境來建模,模擬了一套銷售系統(tǒng)的數(shù)據(jù)倉庫。該測試共包含8張表,數(shù)據(jù)量可設(shè)定從1 GB~3 TB不等。其基準(zhǔn)測試共包含了22個查詢,主要評價指標(biāo)為各個查詢的響應(yīng)時間,即從提交查詢到結(jié)果返回所需時間。
測試結(jié)論
在 TPCH 100G規(guī)模的數(shù)據(jù)集上進(jìn)行對比測試,共22個查詢,結(jié)果如下:
StarRocks 使用本地存儲查詢和 Hive 外表查詢兩種方式進(jìn)行測試。其中,StarRocks On Hive 和 Trino On Hive 查詢的是同一份數(shù)據(jù),數(shù)據(jù)采用 ORC 格式存儲,采用 zlib 格式壓縮。測試環(huán)境使用阿里云 EMR 進(jìn)行構(gòu)建。
最終,StarRocks 本地存儲查詢總耗時為21s,StarRocks Hive 外表查詢總耗時92s。Trino 查詢總耗時307s。可以看到 StarRocks On Hive 在查詢性能方面遠(yuǎn)遠(yuǎn)超過 Trino,但是對比本地存儲查詢還有不小的距離,主要的原因是訪問遠(yuǎn)端存儲增加了網(wǎng)絡(luò)開銷,以及遠(yuǎn)端存儲的延時和 IOPS 通常都不如本地存儲,后面的計劃是通過 Cache 等機(jī)制彌補(bǔ)問題,進(jìn)一步縮短 StarRocks 本地表和 StarRocks On Hive 的差距。
具體測試過程請參考:StarRocks vs Trino TPCH 性能測試對比報告
四、未來規(guī)劃
得益于全面向量化執(zhí)行引擎,CBO 優(yōu)化器以及 MPP 執(zhí)行框架等核心技術(shù),目前 ?StarRocks 已經(jīng)實現(xiàn)了遠(yuǎn)超其他同類產(chǎn)品的極速數(shù)據(jù)湖分析能力。從長遠(yuǎn)來看, StarRocks 在數(shù)據(jù)湖分析方向的愿景是為用戶提供極其簡單、易用和高速的數(shù)據(jù)湖分析能力。為了能夠?qū)崿F(xiàn)這一目標(biāo),StarRocks 現(xiàn)在還有許多工作需要完成,其中包括:
- 集成 Pipeline 執(zhí)行引擎,通過 Push Based 的流水線執(zhí)行方式,進(jìn)一步降低查詢響應(yīng)速度
- 自動的冷熱數(shù)據(jù)分層存儲,用戶可以將頻繁更新的熱數(shù)據(jù)存儲在 StarRocks 本地表上,StarRocks 會定期自動將冷數(shù)據(jù)從本地表遷移到數(shù)據(jù)湖
- 去掉顯式建立外表的步驟,用戶只需要建立數(shù)據(jù)湖對應(yīng)的 resource 即可實現(xiàn)數(shù)據(jù)湖庫表全自動同步
- 進(jìn)一步完善 StarRocks 對于數(shù)據(jù)湖產(chǎn)品特性的支持,包括支持 Apache Hudi 的 MOR 表和 Apache Iceberg 的 v2 表;支持直接寫數(shù)據(jù)湖;支持 Time Travel 查詢,完善 Catalog 的支持度等
- 通過層級 Cache 來進(jìn)一步提升數(shù)據(jù)湖分析的性能
五、更多信息
參考鏈接
[1] StarRocks - 開源大數(shù)據(jù)平臺E-MapReduce - 阿里云
[2] https://github.com/StarRocks/starrocks/issues/1030
[3] https://docs.dorisdb.com/zh-cn/main/using_starrocks/External_table#hive%E5%A4%96%E8%A1%A8
[4] https://github.com/StarRocks/starrocks/issues/2772
[5] StarRocks vs Trino TPCH 性能測試對比報告
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。?
總結(jié)
以上是生活随笔為你收集整理的EMR StarRocks 极速数据湖分析原理解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 揭秘阿里云 RTS SDK 是如何实现直
- 下一篇: 运行cudasift