F1 Query: Declarative Querying at Scale
距離 Google 的上一篇 F1 論文,也就是 F1: A Distributed SQL Database That Scales 已經(jīng) 5 年過去了,Google 在今年的 VLDB 上終于發(fā)布了 F1 的新版本 F1 Query: Declarative Querying at Scale,我們今天就來看一下這篇論文。
2013 年的 F1 是基于 Spanner,主要提供 OLTP 服務(wù),而新的 F1 則定位則是大一統(tǒng):旨在處理 OLTP/OLAP/ETL 等多種不同的 workload。但是這篇新的 F1 論文對 OLTP 的討論則是少之又少,據(jù)八卦是 Spanner 開始原生支持之前 F1 的部分功能,導(dǎo)致 F1 對 OLTP 的領(lǐng)地被吞并了。下面看一下論文的具體內(nèi)容,疏漏之處歡迎指正。
0. 摘要
F1 Query 是一個大一統(tǒng)的 SQL 查詢處理平臺,可以處理存儲在 Google 內(nèi)部不同存儲介質(zhì)(Bigtable, Spanner, Google Spreadsheet)上面的不同格式文件。簡單來說,F1 Query 可以同時支持如下功能:OLTP 查詢,低延遲 OLAP 查詢,ETL 工作流。F1 Query 的特性包括:
- 為不同數(shù)據(jù)源的數(shù)據(jù)提供統(tǒng)一視圖
- 利用數(shù)據(jù)中心的資源提供高吞吐和低延遲的查詢
- High Scalability
- Extensibility
1. 背景
在 Google 內(nèi)部的數(shù)據(jù)處理和分析的 use case 非常復(fù)雜,對很多方面都有不同的要求,比如數(shù)據(jù)大小、延遲、數(shù)據(jù)源以及業(yè)務(wù)邏輯支持。結(jié)果導(dǎo)致了許多數(shù)據(jù)處理系統(tǒng)都只 focus 在一個點上,比如事務(wù)式查詢、OLAP 查詢、ETL 工作流。這些不同的系統(tǒng)往往具有不同的特性,不管是使用還是開發(fā)上都會有極大的不便利。
F1 Query 就在這個背景下誕生了,用論文中的話說就是
we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis.
F1 Query 旨在覆蓋數(shù)據(jù)處理和分析的所有方面。F1 Query 在內(nèi)部已經(jīng)應(yīng)用到了多個產(chǎn)品線,比如 Advertising, Shopping, Analytics 和 Payment。
在 F1 Query 的系統(tǒng)設(shè)計過程中,下面幾點考量具有非常關(guān)鍵的作用。
- Data Fragmentation: Google 內(nèi)部的數(shù)據(jù)由于本身的特性不同,會被存儲到不同的存儲系統(tǒng)中。這樣會導(dǎo)致一個應(yīng)用程序依賴的數(shù)據(jù)可能橫跨多個數(shù)據(jù)存儲系統(tǒng)中,甚至以不同的文件格式。對于這個問題,F1 Query 對于這些數(shù)據(jù)提供一個統(tǒng)一的數(shù)據(jù)視圖。
- Datacenter Architecture: F1 Query 的目標是多數(shù)據(jù)中心,這個和傳統(tǒng)的 shared nothing 架構(gòu)的數(shù)據(jù)處理系統(tǒng)不同相同。傳統(tǒng)的模式為了降低延遲,往往需要考慮 locality,也就是數(shù)據(jù)和計算越近越好。由于 Google 內(nèi)部的網(wǎng)絡(luò)環(huán)境優(yōu)勢,locality 的優(yōu)勢顯得不是那么重要。所以 F1 Query 更強調(diào)計算和存儲分離,這樣計算節(jié)點和存儲節(jié)點的擴展性(scalability)都會更好。畢竟 Google 內(nèi)部的系統(tǒng),scalability 才是第一法則。還有一點值得一提的是,由于使用了 GFS 的更強版本: Colossue File System,磁盤不會成為瓶頸。
- Scalability: 在 F1 Query 中,short query 會在單個節(jié)點上執(zhí)行,larger query 會以分布式的模式執(zhí)行,largest query 以批處理 MapReduce 模式執(zhí)行。對于這些模式,F1 Query 可以通過增加運算的并行度來優(yōu)化。
- Extensibility: 對于那些無法用 SQL 語義來表達的查詢需求,F1 通過提供 user-defined functions (UDF)、user-defined aggregate functions (UDAs) 和 table-valued functions (TVF) 來支持。
2. 架構(gòu)
F1 的架構(gòu)圖如下所示:
下面的方框里面是每個 Datacenter 一套。關(guān)于各個組件的介紹如下:
- 用戶通過 client libary 和 F1 Server 交互
- F1 Master 負責 query 的狀態(tài)的運行時監(jiān)控和其他組件的管理
- F1 Server 收到用戶請求,對于 short query 直接單機執(zhí)行查詢;對于 larger query 轉(zhuǎn)發(fā)到多臺 worker 上并行執(zhí)行查詢。最后再匯總結(jié)果返回給 client。
- F1 Worker 負責具體查詢執(zhí)行
- F1 Server 和 Worker 都是無狀態(tài)的,方便擴展
2.1 query 執(zhí)行
用戶通過 client libary 提交 query 到 F1 Server 上,F1 Server 首先解析和分析 SQL,然后將涉及到的數(shù)據(jù)源提取出來,如果某些數(shù)據(jù)源在當前 datacenter 不存在,則直接將 query 返回給 client 并告知哪些 F1 Server 距離哪些數(shù)據(jù)源更近。這里直接將請求返回給業(yè)務(wù)層,由業(yè)務(wù)層去 retry,設(shè)計的也是非常的簡單。盡管前面說到要將存儲和計算分離,但是這個地方的設(shè)計還是考慮到了 locality,datacenter 級別的 locality,畢竟 locality 對查詢延遲的影響還是巨大的。
F1 Server 將 query 解析并優(yōu)化成 DAG,然后由執(zhí)行層來執(zhí)行,具體執(zhí)行模式(interactive 還是 batch)由用戶指定。原文是: Based on a client- specified execution mode preference, F1 Query executes queries on F1 servers and workers in an interactive mode or in a batch mode.
對于交互式查詢模式(interactive mode)有單節(jié)點集中執(zhí)行模式和多節(jié)點分布式執(zhí)行模式,query 優(yōu)化會根據(jù)啟發(fā)式的算法來決定采用哪種模式。集中式下,F1 Server 解析分析 query,然后在當前節(jié)點上直接執(zhí)行并接收查詢結(jié)果。分布式下,接收 query 的 F1 Server 充當一個 query coordinator 的角色,將 query 拆解并下發(fā)給 worker。交互式查詢在數(shù)據(jù)量不太大的情況下往往具有不錯的性能和高效的資源利用率。
除了交互式查詢還有一種模式是批處理模式(batch mode)。批處理模式使用 MapReduce 框架異步提交執(zhí)行執(zhí)行,相比交互式這種 long-running 方式,批處理模式的可靠性(reliabitly)更高。
2.2 數(shù)據(jù)源
數(shù)據(jù)查詢支持跨 datacenter。存儲計算分離模式使得多數(shù)據(jù)源的支持更加簡單,比如 Spanner, Bigtable, CSV, columnar file 等。為了支持多數(shù)據(jù)源,F1 Query 在他們之上抽象出了一層,讓數(shù)據(jù)看起來都是存儲在關(guān)系型表里面。而各個數(shù)據(jù)源的元數(shù)據(jù)就存儲在 catalog service 里面。
對于沒有存儲到 catalog service 里面的表數(shù)據(jù),只要提供一個DEFINE TABLE即可查詢。
DEFINE TABLE People(format = ‘csv’,path = ‘/path/to/peoplefile’,columns = ‘name:STRING,DateOfBirth:DATE’);SELECT Name, DateOfBirth FROM PeopleWHERE Name = ‘John Doe’;論文中沒有提到的是單看這個 DEFINE TABLE 可以表現(xiàn)力不夠,所說這些信息并不足以表現(xiàn)出數(shù)據(jù)的行為:
- 是否支持 partition?
- 是否支持 邏輯下推?
- 是否支持索引?
- 是否支持多種 掃描模式?
- 對于新數(shù)據(jù)源的支持可以通過 Table-Valued Function (TVF) 的方式來支持。
2.3 Data Sink
query 的結(jié)果可以直接返回給 client,也可以插入到另外一個表里面。
2.4 SQL
SQL 2011。之所以是 2011 是因為其他老的系統(tǒng)使用的是 2011。
3. 交互式查詢
交互式查詢模式是默認的查詢模式。如前所述,交互式查詢有集中式和分布式,具體使用哪種由優(yōu)化器分析 client 的 query 然后決定。
3.1 Single Threaded Execution Kernel
集中式的查詢?nèi)缦聢D所示,是一種 pull-based 的單線程執(zhí)行方式。
3.2 Distributed Execution
如前所述,由優(yōu)化器分析完 query 決定是否采用分布式模式。在分布式這種模式下接收到 query 的 F1 Server 充當一個 coordinator 的角色,將執(zhí)行 plan 推給 worker。worker 是多線程的,可以并發(fā)執(zhí)行單個 query 的無依賴的 fragment。Fragment 是執(zhí)行計劃切分出來的執(zhí)行計劃片段,非常像 MR 或者 Spark 中的 stage。Fragment 之間通過 Exchange Operator (數(shù)據(jù)重分布) 連接。
Fragment 的切分過程如下:優(yōu)化器使用一種基于數(shù)據(jù)分布依賴的 bottom-up 策略。具體來說每個算子對于輸入數(shù)據(jù)的分布都有要求,比如 hash 或者依賴其他字段的分布。典型的例子有 group by key 和 hash join。如果當前的數(shù)據(jù)分布滿足前后兩個算子的要求,則兩個算子就被放到一個 Fragment 里面,否則就被分到兩個 Fragment 里面,然后通過 Exchange Operator 來連接。
下一步就是計算每個 Fragment 的并行度,Fragment 之間并行度互相獨立。葉子節(jié)點的 Fragment 的底層 table scan 決定最初的并行度,然后上層通過 width calculator 逐步計算。比如 hash-join 的底層兩個 Fragment 分別是 100-worker 和 50-worker,則 hash-join 這個 Fragment 會使用 100-worker 的并行度。下面是一個具體的例子。
SELECT Clicks.Region, COUNT(*) ClickCountFROM Ads JOIN Clicks USING (AdId)WHERE Ads.StartDate > ‘2018-05-14’ ANDClicks.OS = ‘Chrome OS’GROUP BY Clicks.RegionORDER BY ClickCount DESC;上面 SQL 對應(yīng)的 Fragment 和一種可能 worker 并行度如下圖所示:
3.3 Partitioning Strategy
數(shù)據(jù)重分布也就是 Fragment 之間的 Exchange Operator,對于每條數(shù)據(jù),數(shù)據(jù)發(fā)送者通過分區(qū)函數(shù)來計算數(shù)據(jù)的目的分區(qū)數(shù),每個分區(qū)數(shù)對應(yīng)一個 worker。Exchange Operator 通過 RPC 調(diào)用,擴展可以支持到每個 Fragment 千級的 partion 并發(fā)。要求再高就需要使用 batch mode。
查詢優(yōu)化器將 scan 操作作為執(zhí)行計劃的葉子節(jié)點和 N 個 worker 節(jié)點并發(fā)。為了并發(fā)執(zhí)行 scan 操作,數(shù)據(jù)必須要被并發(fā)分布,然后由所有 worker 一起產(chǎn)生輸出結(jié)果。有時候數(shù)據(jù)的 partition 會超過 N,而 scan 并發(fā)度為 N,多余的 partition 就交由空閑的 worker 去處理,這樣可以避免數(shù)據(jù)傾斜。
3.4 Performance Considerations
F1 Query 的主要性能問題在于數(shù)據(jù)傾斜和訪問模式不佳。Hash join 對于 hot key 尤為敏感。當 hot key 被 worker 載入到內(nèi)存的時候可能會因為數(shù)據(jù)量太大而寫入磁盤,從而導(dǎo)致性能下降。
論文中舉了一個 lookup join 的例子,這里不打算詳述了。
對于這種數(shù)據(jù)傾斜的問題,F1 Query 的解決方案是 Dynamic Key Range,但是論文中對其描述還是不夠詳細。
F1 Query 對于交互式查詢采用存內(nèi)存計算,而且沒有 check point。因為是內(nèi)存計算,所以速度非常的快,但是由于沒有 checkpoint 等 failover 的機制,只能依賴于業(yè)務(wù)層的重試。
4. 批處理
像 ETL,都是通過 Batch Mode 來處理的。Google 以前都是通過 MapReduce 或者 FlumeJava 來開發(fā)的,開發(fā)成本一般比較高。相比 SQL 這種方式,不能有效復(fù)用 SQL 優(yōu)化,所以 F1 Query 選擇使用 SQL 來做。
如前所述,交互式查詢不適合處理 worker failure,而 batch mode,也就是批處理這種模式特別適合處理 failover(每一個 stage 結(jié)果落盤)。批處理模式復(fù)用交互式 SQL query 的一些特性,比如 query 優(yōu)化,執(zhí)行計劃生成。交互式模式和批處理模式的核心區(qū)別在于調(diào)度方式不同:交互式模式是同步的,而批處理模式是異步的。
4.1 Batch Execution Framework
批處理使用的框架是 MapReduce,Fragment 被抽象成 MapReduce 中的 stage,stage 的輸出結(jié)果被存儲到 Colossus file system (GFS 二代)。
在 Fragment 映射有一點值得注意的是嚴格來說,Fragment 的 DAG 映射到 mr 是 map-reduce-reduce,對這種模式做一個簡單的變通變成:map-reduce-map<identity>-reduce,如下圖:
關(guān)于 MapReduce 的更詳細信息可以參考 Google 03 年那篇論文。
4.2 Batch Service Framework
Framework 會對 batch mode query 的執(zhí)行進行編排。具體包括:query 注冊,query 分發(fā),調(diào)度已經(jīng)監(jiān)控 mr 作業(yè)的執(zhí)行。當 F1 Server 接收到一個 batch mode query,它會先生成執(zhí)行計劃并將 query 注冊到 Query Registry,全局唯一的 Spanner db,用來 track batch mode query。Query Distributor 然后將 query 分發(fā)給 datacenter。Query Scheduler 會定期從 Registry 拿到 query,然后生成執(zhí)行計劃并交給 Query Executor 來處理。
Service Framework 的健壯性非常好:Query Distributor 是選主(master-elect)模式;Query Scheduler 在每個 datacenter 有多個。query 的所有執(zhí)行狀態(tài)都是保存在 Query Registry,這就保證其他的組件是無狀態(tài)的。容錯處理:MapReduce 的 stage 會被重試,如果 datacenter 出問題,query 會被分配到新的 datacenter 上重新執(zhí)行。
5. 優(yōu)化器
SQL 優(yōu)化器類似 Spark Catalyst,架構(gòu)如下圖,不細說了。
6. EXTENSIBILITY
對于很多復(fù)雜業(yè)務(wù)邏輯無法用 SQL 來描述,F1 針對這種情況提供了一種用戶自定義函數(shù)的方法,包括 UDF (user-define functions),UDA (aggrega- tion functions) 和 TVF (table-valued functions)。對于簡單的UDF需求,通常直接以SQL或者LUA的形式作為query的一部分;對于更復(fù)雜或者性能要求高的UDF需求,則可以用其它高級語言以UDF Server的形式實現(xiàn)。
UDF Server 和 F1 Query 是 RPC 調(diào)用關(guān)系,有 client 單獨部署在同一個 datacenter。udf server 完全有 client 來控制,無狀態(tài),基本可以無限擴展。
6.1 Scalar Functions
UDF 并不是新的概念,UDF Server 這種部署方式看上去還算新穎一點。但是 UDF Server 這種單獨部署模式一個可能的問題是延遲問題,這里通過批量流水線的方式來減少延遲。下面是 UDF 的一個例子。
local function string2unixtime(value)local y,m,d = match("(%d+)%-(%d+)%-(%d+)")return os.time({year=y, month=m, day=d}) end6.2 Aggregate Functions
UDA 是對多行輸入產(chǎn)生一個單一的輸出,要實現(xiàn) UDA,用戶需要實現(xiàn)算子 Initialize, Accumulate, and Finalize。另外如要要對多個 UDA 的子聚合結(jié)果進行再聚合,用戶可以實現(xiàn) Reaccumulate。
6.3 Table-Valued Functions
TVF 的輸入是一個 table,輸出是另外一個 table。這種在機器學習的模型訓(xùn)練場景下比較有用。下面是論文中的具體的一個例子:EventsFromPastDays 就是一個 TVF。
SELECT * FROM EventsFromPastDays(3, TABLE Clicks);當然 TVF 也支持用 SQL 來描述,如下。
CREATE TABLE FUNCTION EventsFromPastDays(num_days INT64, events ANY TABLE) ASSELECT * FROM eventsWHERE date >= DATE_SUB(CURRENT_DATE(),INTERVAL num_days DAY);7. Production Metric
下面是 F1 Query 在 Production 環(huán)境下的幾個 metrics。
8. 總結(jié)
回過頭來看 F1 Query 最新的這篇論文給人最大的啟發(fā)就是大一統(tǒng)的思想,這個很有可能是行業(yè)發(fā)展趨勢。回想一下 MapReduce 論文由 Google 于 2003 年發(fā)表,開源實現(xiàn) Hadoop 于 2005 問世。不妨期待了一下未來的 3 到 5 年的 F1 Query 的開源產(chǎn)品。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的F1 Query: Declarative Querying at Scale的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里十年DBA经验产品经理:真的不要再有
- 下一篇: Flink 与 Hive 的磨合期