日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

深度 | 一条查询SQL的前世今生 —— ClickHouse 源码阅读

發布時間:2025/3/21 数据库 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深度 | 一条查询SQL的前世今生 —— ClickHouse 源码阅读 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作者:逍凱,阿里云數據庫實習開發工程師

注:以下分析基于開源 v19.15.2.2-stable 版本進行,社區最新版本代碼改動較大,但是總體思路是不變的。

01?用戶提交一條查詢SQL背后發生了什么

在傳統關系型數據庫中,SQL處理器的組件主要包括以下幾種:

? Query Parsing

負責進行詞法和語法分析,把程序從人類高可讀的格式(即SQL)轉化成機器高可讀的格式(AST,抽象語法樹)。

詞法分析指的是把SQL中的字符序列分解成一個個獨立的詞法單元——Token(<類型,值>)。

語法分析指的是從詞法分析器輸出的token中識別各類短語,并構造出一顆抽象語法樹。而按照構造抽象語法樹的方向,又可以把語法分析分成自頂向下和自底向上分析兩種。而ClickHouse采用的則是手寫一個遞歸下降的語法分析器。

? Query Rewrite

即通常我們說的"Logical Optimizer"或基于規則的優化器(Rule-Based Optimizer,即RBO)。

其負責應用一些啟發式規則,負責簡化和標準化查詢,無需改變查詢的語義。

常見操作有:謂詞和算子下推,視圖展開,簡化常量運算表達式,謂詞邏輯的重寫,語義的優化等。

? Query Optimizer

即通常我們所說的"Physical Optimizer",負責把內部查詢表達轉化成一個高效的查詢計劃,指導DBMS如何去取表,如何進行排序,如何Join。如下圖所示,一個查詢計劃可以被認為是一個數據流圖,在這個數據流圖中,表數據會像在管道中傳輸一樣,從一個查詢操作符(operator)傳遞到另一個查詢操作符。

一個查詢計劃

? Query Executor
查詢執行器,負責執行具體的查詢計劃,從存儲引擎中獲取數據并且對數據應用查詢計劃得到結果。

執行引擎也分為很多種,如經典的火山模型(Volcano Model),還有ClickHouse采用的向量化執行模型(Vectorization Model)。

(圖來自經典論文 Architecture Of Database System)

但不管是傳統的關系型數據庫,還是非關系型數據庫,SQL的解析和生成執行計劃過程都是大同小異的,而縱覽ClickHouse的源代碼,可以把用戶提交一條查詢SQL背后的過程總結如下:

1.服務端接收客戶端發來的SQL請求,具體形式是一個網絡包,Server的協議層需要拆包把SQL解析出來

2.Server負責初始化上下文與Network Handler,然后?Parser?對Query做詞法和語法分析,解析成AST

3.Interpreter的?SyntaxAnalyzer?會應用一些啟發式規則對AST進行優化重寫

4.Interpreter的?ExpressionAnalyzer?根據上下文信息以及優化重寫后的AST生成物理執行計劃

5.物理執行計劃分發到本地或者分布式的executor,各自從存儲引擎中獲取數據,應用執行計劃

6.Server把執行后的結果以Block流的形式輸出到Socket緩沖區,Client從Socket中讀取即可得到結果

01?接收客戶端請求


我們要以服務端的視角來出發,首先來看server.cpp大概做什么事情:

下面只挑選重要的邏輯:

? 初始化上下文

? 初始化Zookeeper(ClickHouse的副本復制機制需要依賴ZooKeeper)

? 常規配置初始化

? 綁定服務端的端口,根據網絡協議初始化Handler,對客戶端提供服務

int Server::main() { // 初始化上下文 global_context = std::make_unique<Context>(Context::createGlobal()); global_context->setApplicationType(Context::ApplicationType::SERVER);// zk初始化 zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });//其他config的初始化 //...//綁定端口,對外提供服務 auto address = make_socket_address(host, port); socket.bind(address, /* reuseAddress = */ true);//根據網絡協議建立不同的server類型 //現在支持的server類型有:HTTP,HTTPS,TCP,Interserver,mysql //以TCP版本為例: create_server("tcp_port", [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socket_bind_listen(socket, listen_host, port); servers.emplace_back(std::make_unique<Poco::Net::TCPServer>( new TCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); });//啟動server for (auto & server : servers) server->start();}

客戶端發來的請求是由各自網絡協議所對應的?Handler?來進行的,server在啟動的時候 Handler 會被初始化并綁定在指定端口中。我們以TCPHandler為例,看看服務端是如何處理客戶端發來的請求的,重點關注?TCPHandler::runImpl?的函數實現:

? 初始化輸入和輸出流的緩沖區

? 接受請求報文,拆包

? 執行Query(包括整個詞法語法分析,Query重寫,物理計劃生成和生成結果)

? 把Query結果保存到輸出流,然后發送到Socket的緩沖區,等待發送回客戶端

void TCPHandler::runImpl() { //實例化套接字對應的輸入和輸出流緩沖區 in = std::make_shared<ReadBufferFromPocoSocket>(socket()); out = std::make_shared<WriteBufferFromPocoSocket>(socket());while (1){ // 接收請求報文 receivePacket();// 執行Query state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);//根據Query種類來處理不同的Query //處理insert Query processInsertQuery(); //并發處理普通Query processOrdinaryQueryWithProcessors(); //單線程處理普通Query processOrdinaryQuery(); }}

那CK處理客戶端發送過來的Query的具體邏輯是怎樣的呢?

我們可以在

dbms/src/Interpreters/executeQuery.cpp?中一探究竟:

具體邏輯在?executeQueryImpl?函數中,挑選核心的邏輯進行講解:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl() { //構造Parser ParserQuery parser(end, settings.enable_debug_queries); ASTPtr ast;//把Query轉化為抽象語法樹 ast = parseQuery(parser, begin, end, "", max_query_size);//生成interpreter實例 auto interpreter = InterpreterFactory::get(ast, context, stage);// interpreter解析AST,結果是BlockIO res = interpreter->execute();//返回結果是抽象語法樹和解析后的結果組成的二元組 return std::make_tuple(ast, res); }

該函數所做的事情:

? 構建Parser,把Query解析成AST(抽象語法樹)

? InterpreterFactory根據AST生成對應的Interpreter實例

? AST是由Interpreter來解析的,執行結果是一個BlockIO,BlockIO是對?BlockInputStream?和?BlockOutputStream?的一個封裝。

總結:

? 服務端調用?executeQuery?來處理client發送的Query,執行后的結果保存在state這個結構體的io成員中。

每一條Query都會對應一個state結構體,記錄了這條Query的id,處理狀態,壓縮算法,Query的文本和Query所處理數據對應的IO流等元信息。

? 然后服務端調用?processOrdinaryQuery?等方法把輸出流結果封裝成異步的IO流,發送到回client。

02?解析請求(Parser)


CK選擇采用手寫一個遞歸下降的Parser來對SQL進行解析,生成的結果是這個SQL對應的抽象語法樹(AST),抽象語法樹由表示各個操作的節點(IAST)表示。而本節主要介紹Parser背后的核心邏輯:

詞法分析和語法分析的核心邏輯可以在parseQuery.cpp的?tryParseQuery?中一覽無余。

該函數利用lexer將掃描Query字符流,將其分割為一個個的Token,?token_iterator?即一個Token流迭代器,然后parser再對Token流進行解析生成AST抽象語法樹。

ASTPtr tryParseQuery() { //Token為lexer詞法分析后的基本單位,詞法分析后生成的是Token流 Tokens tokens(pos, end, max_query_size); IParser::Pos token_iterator(tokens); ASTPtr res; //Token流經過語法分析生成AST抽象語法樹 bool parse_res = parser.parse(token_iterator, res, expected); return res;}

我們可以看到,語法分析的核心就在于parser執行的parse方法。parse 方法具體的實現在?ParserQuery.cpp?的?parseImpl?中。

bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserQueryWithOutput query_with_output_p(enable_explain); ParserInsertQuery insert_p(end); ParserUseQuery use_p; ParserSetQuery set_p; ParserSystemQuery system_p;bool res = query_with_output_p.parse(pos, node, expected) || insert_p.parse(pos, node, expected) || use_p.parse(pos, node, expected) || set_p.parse(pos, node, expected) || system_p.parse(pos, node, expected);return res; }

我們可以看到,這個方法粗略地把Query分為了五種,但是本質上可以歸納為兩種(第一種為有結果輸出,對應show,select,create等語句;第二種為無結果輸出,對應insert,use,set和與系統相關的語句(如exit))

? QueryWithOutput
? InsertQuery
? UseQuery
? SetQuery
? SystemQuery

每一種Query都自定義了其專屬的Parser,所以代碼邏輯是當接收到一個Query輸入的時候,會嘗試各種Query的Parser,直到成功為止。

我們可以select語句對應的parser進行分析:?

核心邏輯可以總結為:?

1.先給出select語句中可能出現的關鍵詞

2.在詞法分析生成的Token流中爬取這些關鍵詞

3.如果成功爬取,則?setExpression?函數會組裝該關鍵字對應的AST節點

每一種SQL語句(如select,drop,insert,create)都有對應的AST類,并且分別包含了這些語句中特有的關鍵字。

bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { //創建AST樹節點 auto select_query = std::make_shared<ASTSelectQuery>(); node = select_query;//select語句中會出現的關鍵詞 ParserKeyword s_select("SELECT"); ParserKeyword s_distinct("DISTINCT"); ParserKeyword s_from("FROM"); ParserKeyword s_prewhere("PREWHERE"); ParserKeyword s_where("WHERE"); ParserKeyword s_group_by("GROUP BY"); ParserKeyword s_with("WITH"); ParserKeyword s_totals("TOTALS"); ParserKeyword s_having("HAVING"); ParserKeyword s_order_by("ORDER BY"); ParserKeyword s_limit("LIMIT"); ParserKeyword s_settings("SETTINGS"); ParserKeyword s_by("BY"); ParserKeyword s_rollup("ROLLUP"); ParserKeyword s_cube("CUBE"); ParserKeyword s_top("TOP"); ParserKeyword s_with_ties("WITH TIES"); ParserKeyword s_offset("OFFSET");//... //依次對Token流爬取上述關鍵字 ParserTablesInSelectQuery().parse(pos, tables, expected)//根據語法分析結果設置AST的Expression屬性,可以理解為如果SQL存在該關鍵字,這個關鍵字都會轉化為AST上的一個節點 select_query->setExpression(ASTSelectQuery::Expression::WITH, std::move(with_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables)); select_query->setExpression(ASTSelectQuery::Expression::PREWHERE, std::move(prewhere_expression)); select_query->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::HAVING, std::move(having_expression)); select_query->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_OFFSET, std::move(limit_by_offset)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY_LENGTH, std::move(limit_by_length)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_BY, std::move(limit_by_expression_list)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_OFFSET, std::move(limit_offset)); select_query->setExpression(ASTSelectQuery::Expression::LIMIT_LENGTH, std::move(limit_length)); select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings));}

整個Parser的流程圖:

03?執行請求(Interpreter)


解釋器(Interpreter)負責從抽象語法樹中創建查詢執行的流水線,整條流水線以?BlockInputStream?和?BlockOutputStream?進行組織。比方說"select"是基于"from"的Block輸出流來進行選擇的,選擇后的結果也會以Block輸出流的形式輸出到結果。首先我們來看:

dbms/src/Interpreters/InterpreterFactory.cpp

每一種Query都會有對應的Interpreter,這個工廠方法就是根據AST的種類來實例化其對應的Interpreter,由其來具體執行對應AST的執行計劃:

std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context & context, QueryProcessingStage::Enum stage) { //舉個例子,如果該AST是由select語句轉化過來, if (query->as<ASTSelectQuery>()) { /// This is internal part of ASTSelectWithUnionQuery. /// Even if there is SELECT without union, it is represented by ASTSelectWithUnionQuery with single ASTSelectQuery as a child. return std::make_unique<InterpreterSelectQuery>(query, context, SelectQueryOptions(stage)); } }

我們就以?InterpreterSelectQuery?為例,了解其實例化的核心邏輯:

InterpreterSelectQuery::InterpreterSelectQuery() { //獲取AST auto & query = getSelectQuery();//對AST做進一步語法分析,對語法樹做優化重寫 syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());//每一種Query都會對應一個特有的表達式分析器,用于爬取AST生成執行計劃(操作鏈) query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>( query_ptr, syntax_analyzer_result, context, NameSet(required_result_column_names.begin(), required_result_column_names.end()), options.subquery_depth, !options.only_analyze); }

語法分析直接生成的AST轉化成執行計劃可能性能上并不是最優的,因此需要SyntaxAnalyzer?對其進行優化重寫,在其源碼中可以看到其涉及到非常多?基規則優化(rule based optimization)?的trick。

SyntaxAnalyzer?會逐個針對這些規則對查詢進行檢查,確定其是否滿足轉換規則,一旦滿足就會對其進行轉換。

SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze() { // 剔除冗余列 removeDuplicateColumns(result.source_columns);// 根據settings中enable_optimize_predicate_expression配置判斷是否進行謂詞下移 replaceJoinedTable(node);// 根據settings中distributed_product_mode配置重寫IN 與 JOIN 表達式 InJoinSubqueriesPreprocessor(context).visit(query);// 優化Query內部的布爾表達式 LogicalExpressionsOptimizer().perform();// 創建一個從別名到AST節點的映射字典 QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);// 公共子表達式的消除 QueryNormalizer(normalizer_data).visit(query);// 消除select從句后的冗余列 removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);// 執行標量子查詢,并且用常量替代標量子查詢結果 executeScalarSubqueries(query, context, subquery_depth);// 如果是select語句還會做下列優化:// 謂詞下移優化 PredicateExpressionsOptimizer(select_query, settings, context).optimize();/// GROUP BY 從句的優化 optimizeGroupBy(select_query, source_columns_set, context);/// ORDER BY 從句的冗余項剔除 optimizeOrderBy(select_query);/// LIMIT BY 從句的冗余列剔除 optimizeLimitBy(select_query);/// USING語句的冗余列剔除 optimizeUsing(select_query);}

這里挑選幾個簡單介紹一下:

? 公共子表達式消除(Common Subexpression Elimination)

如果表達式?x op y?先前被計算過,并且從先前的計算到現在其計算表達式對應的值沒有改變,那么 x op y 就稱為公共子表達式。公共子表達式消除會搜索所有相同計算表達式的實例,并分析是否值得用保存計算值的單個變量來替換它們,以減少計算的開銷。

? 標量子查詢(Scala Subquery)的常量替換

標量子查詢就是返回單一值的子查詢,和公共子表達式消除相似,可以用常量來替換SQL中所有的標量子查詢結果以減少計算開銷。

? 謂詞下移(Predicate Pushdown)

把外層查詢塊中的WHERE子句的謂詞下移到較低層查詢塊如視圖,以盡可能把過濾數據的操作移動到靠近數據源的位置。提前進行數據過濾能夠大幅減少網絡傳輸或者內存讀取訪問的數據量,以提高查詢效率。

而?query_analyzer?的作用可以理解為解析優化重寫后的AST,然后對所要進行的操作組成一條操作鏈,即物理執行計劃,如:

ExpressionActionsChain chain; analyzer.appendWhere(chain); chain.addStep(); analyzer.appendSelect(chain); analyzer.appendOrderBy(chain); chain.finalize();

上述代碼把where,select,orderby操作都加入到操作鏈中,接下來就可以從Storage層讀取Block,對Block數據應用上述操作鏈的操作。而執行的核心邏輯,就在對應Interpreter的?executeImpl?方法實現中,這里以select語句的Interpreter來了解下讀取Block數據并且對block數據進行相應操作的流程。

void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input) { // 對應Query的AST auto & query = getSelectQuery();AnalysisResult expressions; // 物理計劃,判斷表達式是否有where,aggregate,having,order_by,litmit_by等字段 expressions = analyzeExpressions( getSelectQuery(), *query_analyzer, QueryProcessingStage::FetchColumns, options.to_stage, context, storage, true, filter_info);// 從Storage讀取數據 executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);// eg:根據SQL的關鍵字在BlockStream流水線中執行相應的操作, 如where,aggregate,distinct都分別由一個函數負責執行 executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);executeDistinct(pipeline, true, expressions.selected_columns);}

既然我們知道了執行計劃AnalysisResult(即物理執行計劃),接下來就需要從storage層中讀取數據來執行對應的操作,核心邏輯在?executeFetchColumns?中: 核心操作就是從storage層讀取所要處理列的Block,并組織成BlockStream。

void InterpreterSelectQuery::executeFetchColumns( QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, const SortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere) { // 實例化Block Stream auto streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams) // 讀取列對應的Block,并且組織成Block Stream streams = {std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns))}; streams.back() = std::make_shared<ExpressionBlockInputStream>(streams.back(), query_info.prewhere_info->remove_columns_actions); }

讀取完Block Stream之后就是對其執行各種execute操作如?executeAggregation?,?executeWhere?操作,詳見?InterpreterSelectQuery::executeImpl?的代碼。

因此Interpreter的處理過程可以總結為:

? 對AST進行優化重寫
? 解析重寫后的AST并生成操作鏈(執行計劃)
? 從存儲引擎中讀取要處理的Block數據
? 對讀取的Block數據應用操作鏈上的操作

那我們讀取Block Stream并進行處理后,生成的結果如何寫回到storage層呢? 我們這里以insert語句的Interpreter來了解下:

BlockIO InterpreterInsertQuery::execute() { // table為存儲引擎接口 StoragePtr table = getTable(query); BlockOutputStreamPtr out;// 從存儲引擎讀取Block Stream auto query_sample_block = getSampleBlock(query, table); out = std::make_shared<AddingDefaultBlockOutputStream>( out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);//執行結果封裝成BlockIO BlockIO res; res.out = std::move(out); }

上面代碼中的StoragePtr實際上就是IStorage這個存儲引擎的接口

???????using StoragePtr = std::shared_ptr<IStorage>;

無論是寫入還是讀取操作都是依靠底層存儲引擎(如MergeTree)的write和read接口來實現的,關于存儲引擎的細節實現這里暫時不贅述,這里我們只需要知道我們從存儲引擎接口中以流方式讀取Block數據,而結果組織成BlockIO流輸出。Interpreter的流程總結如下:

04?返回請求結果


?

TCPHandler::runImpl?中,執行完?executeQuery?之后需要調用各種processQuery的方法來給client返回執行SQL后的結果。
我們以?TCPHandler::processOrdinaryQuery?為例做簡單分析:

void TCPHandler::processOrdinaryQuery() { //把BlockStream封裝成異步的Stream,那么從流中讀取數據將會是異步操作 AsynchronousBlockInputStream async_in(state.io.in);while(true){ Block block; //從IO流讀取block數據 block = async_in.read(); //發送block數據 sendData(block); } }

Server負責在?sendData?函數中把輸出結果寫入到套接字輸出緩沖區中,client只要從這個輸出緩沖區讀取就能夠得到結果。

void TCPHandler::sendData(const Block & block) { //初始化OutputStream的參數 initBlockOutput(block);// 調用BlockOutputStream的write函數,把Block寫到輸出流 state.block_out->write(block); state.maybe_compressed_out->next(); out->next(); }

?

02?結語

了解ClickHouse背后SQL的查詢整個流程,不僅能讓數據庫使用者更清晰地認識到如何編寫最優化的SQL,也能夠讓數據庫內核開發者加深對數據庫體系結構的理解,提高開發效率。

本文并沒有涉及到太深入的技術細節,諸如向量化執行引擎,SIMD,基于llvm的動態代碼生成,類MergeTree存儲引擎等CK的技術細節也沒有提及,只是從宏觀角度給讀者介紹了執行SQL背后內核到底發生了什么。后續我們會推出更多內核源碼解讀文章,敬請關注。

總結

以上是生活随笔為你收集整理的深度 | 一条查询SQL的前世今生 —— ClickHouse 源码阅读的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。