ClickHouse源码阅读(0000 0110) —— 使用ReplicatedMergeTree引擎时的副本选择问题
在使用ReplicatedMergeTree引擎和Distributed引擎的時候,對于同一張表,服務器上存在多個副本,在查詢數據的時候,是如何在這些副本之間進行選擇的呢?結合源碼來試著分析一下...
對于一條SELECT SQL,從以下方法開始:
pipeline.streams = storage->read(required_columns, query_info, context, processing_stage, max_block_size, max_streams);如圖:
進入到這個方法中,主要步驟包括修改AST(修改表名)、物化header、構建select_stream_factory等,最后執行ClusterProxy::executeQuery()方法,簡化代碼如下:
BlockInputStreams StorageDistributed::read(const Names & /*column_names*/,const SelectQueryInfo &query_info,const Context &context,QueryProcessingStage::Enum processed_stage,const size_t /*max_block_size*/,const unsigned /*num_streams*/) {auto cluster = getCluster();const Settings &settings = context.getSettingsRef();//修改AST, 修改表名const auto &modified_query_ast = rewriteSelectQuery(......);//header, 應該是列名那行, 即Structure of query resultBlock header = materializeBlock(......);ClusterProxy::SelectStreamFactory select_stream_factory = ......;......//重點方法return ClusterProxy::executeQuery(select_stream_factory, cluster, modified_query_ast, context, settings);}進入ClusterProxy::executeQuery()方法, 會設置網絡帶寬限制,然后遍歷數據的所有分片,對每個分片執行createForShard()方法,簡化代碼如下:
BlockInputStreams executeQuery(IStreamFactory & stream_factory, const ClusterPtr & cluster,const ASTPtr & query_ast, const Context & context, const Settings & settings) {BlockInputStreams res;const std::string query = queryToString(query_ast);....../// Network bandwidth limit, if needed. 網絡帶寬限制ThrottlerPtr throttler;if (settings.max_network_bandwidth || settings.max_network_bytes){......}//遍歷數據的所有的分片,針對每個分片for (const auto & shard_info : cluster->getShardsInfo())stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);return res; }進入到createForShard()方法,代碼邏輯還是比較清晰的,先定義了emplace_local_stream和emplace_remote_stream,然后根據prefer_localhost_replica和shard_info.isLocal()這兩個條件判斷使用local_stream還是remotr_stream,簡化代碼如下:
//遍歷數據的所有的分片,針對每個分片void SelectStreamFactory::createForShard(const Cluster::ShardInfo &shard_info,const String &query, const ASTPtr &query_ast,const Context &context, const ThrottlerPtr &throttler,BlockInputStreams &res) {auto emplace_local_stream = [&]()//將 數據流 放在本地{res.emplace_back(createLocalStream(query_ast, context, processed_stage));};auto emplace_remote_stream = [&]()//將 數據流 發送給遠程服務器{//構建RemoteBlockInputStreamauto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr,throttler, external_tables, processed_stage);stream->setPoolMode(PoolMode::GET_MANY);if (!table_func_ptr)stream->setMainTable(main_table);res.emplace_back(std::move(stream));};const auto &settings = context.getSettingsRef();//prefer_localhost_replica = 1 且 本地服務器上存在這個分片(shard_info.isLocal() = true), 就使用本地分片數據.// 如果本地服務器上沒有這個分片, 則只能連接遠程獲取該分片的數據emplace_remote_stream//prefer_localhost_replica = 0 則 連接遠程獲取該分片的數據emplace_remote_streamif (settings.prefer_localhost_replica && shard_info.isLocal()) {......} elseemplace_remote_stream();}注意:可以具體看下shard_info.isLocal()方法的具體實現。
?
下面分開分析,先分析滿足settings.prefer_localhost_replica && shard_info.isLocal()條件的情況,主要流程包括:
1-判斷本地服務器的這個分片上有沒有這個表;
2-判斷這個表是不是用的復制表引擎;
3-如果本地分片存在這個表,且這個表是復制表,那么就該考慮副本的延遲問題了。
4-獲取本地副本的延遲和配置的可允許的最大延遲時間,兩者比較。如果本地副本的延遲時間小于max_allowed_delay, 說明本地副本是可以使用的,否則認為本地副本已經過期了。
5-如果本地副本已經過期了,則看fallback_to_stale_replicas_for_distributed_queries這個配置參數,是不是允許使用過期的副本。
6-如果不允許使用過期的副本,即設置了fallback_to_stale_replicas_for_distributed_queries=0,則看當前分片是不是有遠程副本分片,如果有則使用遠程分片;如果沒有則拋出異常;
7-如果允許使用過期的副本,即設置了fallback_to_stale_replicas_for_distributed_queries=1,且當前分片沒有遠程副本分片,則使用本地分片過期的數據;
8-如果允許使用過期的副本,且當前分片也有遠程副本分片,就先嘗試使用遠程副本, 但如果它們也過期了, 則退回使用本地副本 。懶洋洋地做這件事以避免在主線程中連接(惰性創建連接)(后面有部分代碼還沒有仔細看)。
基本判斷邏輯就是這樣了。帶注釋的代碼如下:
//運行到這里表示本地服務器上存在這個分片StoragePtr main_table_storage;//根據庫名表名在本地服務器的分片上的找到需要查詢的這個表if (table_func_ptr) {const auto *table_function = table_func_ptr->as<ASTFunction>();main_table_storage = TableFunctionFactory::instance().get(table_function->name, context)->execute(table_func_ptr, context);} elsemain_table_storage = context.tryGetTable(main_table.database, main_table.table);//本地服務器的這個分片上沒有這個表if (!main_table_storage) /// Table is absent on a local server.{ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);//shard_info.hasRemoteConnections() 本次查詢是否是遠程發過來的(是否有遠程副本), 如果是則需要emplace_remote_streamif (shard_info.hasRemoteConnections()) {LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"There is no table " << main_table.database << "." << main_table.table<< " on local replica of shard " << shard_info.shard_num<< ", will try remote replicas.");emplace_remote_stream();} elseemplace_local_stream(); /// Let it fail the usual way.return;}//運行到這里, 表示這個表在本地服務器的這個分片上//通過一個動態轉換來判斷這個表有沒有副本const auto *replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());//該分片在遠程沒有副本, 只能使用本地分片數據if (!replicated_storage) {/// Table is not replicated, use local server.emplace_local_stream();return;}// 代碼運行到這里, 說明對于當前分片, 本地服務器上有這個表, 且該分片也有遠程副本.// 那應該怎么選呢?// 到了這一步就需要考慮應該選擇本地的還是遠程的了// 如果設置了max_replica_delay_for_distributed_queries(分布式查詢的最大副本延遲)這個參數, 則復制表的分布式查詢將選擇復制延遲時間(秒)小于指定值(不包括該指定值)的服務器.// 零意味著不考慮延遲。UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;//沒有設置max_allowed_delay這個參數, 則不考慮延遲, 優先使用本地分片數據if (!max_allowed_delay) {emplace_local_stream();return;}//設置了max_allowed_delay這個參數, 就先獲取副本的絕對延遲 (這里獲取的應該是本地副本的延遲時間)UInt32 local_delay = replicated_storage->getAbsoluteDelay();//如果本地副本的延遲時間小于max_allowed_delay, 說明本地副本是可以使用的if (local_delay < max_allowed_delay) {emplace_local_stream();return;}/// If we reached this point, local replica is stale.// 如果代碼執行到這里, 表示本地副本已經過期了(復制延遲時間 >= 指定值300s)ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay<< "s.)");// 代碼運行到這里表示設置了max_replica_delay_for_distributed_queries(分布式查詢的最大副本延遲)這個參數, 并且本地的副本已過期了.// 如果設置了fallback_to_stale_replicas_for_distributed_queries=0, 表示不允許使用過期的副本,// 則將查看當前分片是不是有遠程副本, 如果由則使用遠程副本, 如果沒有則報錯if (!settings.fallback_to_stale_replicas_for_distributed_queries) {if (shard_info.hasRemoteConnections()) {/// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.emplace_remote_stream();return;} elsethrow Exception("Local replica of shard " + toString(shard_info.shard_num)+ " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",ErrorCodes::ALL_REPLICAS_ARE_STALE);}// 如果設置了fallback_to_stale_replicas_for_distributed_queries=1, 表示允許使用過期的副本,// 再判斷當前分片是不是有遠程副本, 如果沒有遠程副本. 則只能使用本地過期的副本if (!shard_info.hasRemoteConnections()) {/// There are no remote replicas but we are allowed to fall back to stale local replica.emplace_local_stream();return;}//代碼運行到這里表示允許使用過期的副本, 且遠程也有副本/// Try our luck with remote replicas, but if they are stale too, then fallback to local replica./// Do it lazily to avoid connecting in the main thread.//于是就先嘗試使用遠程副本, 但如果它們也過期了, 則退回使用本地副本//懶洋洋地做這件事以避免在主線程中連接(惰性創建連接)//惰性創建數據流(類比spark中, 一次行動操作觸發一次計算), 這里也是, 先捋清底層數據都有哪些, 再創建streamauto lazily_create_stream = [pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,local_delay]()-> BlockInputStreamPtr {std::vector<ConnectionPoolWithFailover::TryResult> try_results;try {if (table_func_ptr)try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY);elsetry_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY,main_table);}catch (const Exception &ex) {if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)LOG_WARNING(&Logger::get("ClusterProxy::SelectStreamFactory"),"Connections to remote replicas of local shard " << shard_num<< " failed, will use stale local replica");elsethrow;}double max_remote_delay = 0.0;for (const auto &try_result : try_results) {if (!try_result.is_up_to_date)max_remote_delay = std::max(try_result.staleness, max_remote_delay);}if (try_results.empty() || local_delay < max_remote_delay)return createLocalStream(query_ast, context, stage);else {std::vector<IConnectionPool::Entry> connections;connections.reserve(try_results.size());for (auto &try_result : try_results)connections.emplace_back(std::move(try_result.entry));return std::make_shared<RemoteBlockInputStream>(std::move(connections), query, header, context, nullptr, throttler, external_tables,stage);}};res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header,lazily_create_stream));對于使用遠程副本的情況,先看下定義的emplace_remote_stream,代碼如下:
auto emplace_remote_stream = [&]()//將 數據流 發送給遠程服務器{//構建RemoteBlockInputStreamauto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr,throttler, external_tables, processed_stage);stream->setPoolMode(PoolMode::GET_MANY);if (!table_func_ptr)stream->setMainTable(main_table);res.emplace_back(std::move(stream));};關鍵在于構建RemoteBlockInputStream。注意shard_info.pool的類型是ConnectionPoolWithFailoverPtr(具有容錯功能的連接池)。進一步找到構建RemoteBlockInputStream的方法:
RemoteBlockInputStream::RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr &pool,const String &query_, const Block &header_, const Context &context_, const Settings *settings,const ThrottlerPtr &throttler, const Tables &external_tables_, QueryProcessingStage::Enum stage_): header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_) {if (settings)context.setSettings(*settings);//創建多路連接create_multiplexed_connections = [this, pool, throttler]() {const Settings ¤t_settings = context.getSettingsRef();std::vector<IConnectionPool::Entry> connections;if (main_table) {//如果限定了表名(沒有使用remote表函數的情況)auto try_results = pool->getManyChecked(¤t_settings, pool_mode, *main_table);connections.reserve(try_results.size());for (auto &try_result : try_results)connections.emplace_back(std::move(try_result.entry));} else//對于使用了remote表函數的情況connections = pool->getMany(¤t_settings, pool_mode);return std::make_unique<MultiplexedConnections>(std::move(connections), current_settings, throttler);};}其中,對于沒有使用表函數的情況,pool->getManyChecked()這個方法是重點。
?
好了,這篇文章已經很長了,就先到這兒,剩余的內容到下篇文章中吧。
總結
以上是生活随笔為你收集整理的ClickHouse源码阅读(0000 0110) —— 使用ReplicatedMergeTree引擎时的副本选择问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 绝地求生刺激战场亚服务器要维护多久,绝地
- 下一篇: js的基本语法