Dubbo笔记 ⑭ :Dubbo集群组件 之 Directory
文章目錄
- 一、前言
- 1. Directory 概念
- 二、Directory 的種類
- 1. AbstractDirectory
- 2. StaticDirectory
- 3. RegistryDirectory
- 3.1. Invoker 的列舉邏輯
- 3.2 接收服務配置變更的邏輯
- 3.3 Invoker 列表的刷新邏輯
- 3.3.1 RegistryDirectory#toInvokers
- 3.3.2 DubboProtocol#refer
- 五、Directory 的調(diào)用過程
- 1. Directory 的創(chuàng)建
- 1.1 RegistryProtocol的創(chuàng)建
- 1.2 StaticDirectory 的創(chuàng)建
- 2. Diectory 的回調(diào)
一、前言
本系列為個人Dubbo學習筆記,內(nèi)容基于《深度剖析Apache Dubbo 核心技術(shù)內(nèi)幕》, 過程參考官方源碼分析文章,僅用于個人筆記記錄。本文分析基于Dubbo2.7.0版本,由于個人理解的局限性,若文中不免出現(xiàn)錯誤,感謝指正。
集群組件相關(guān)文章:
在 Dubbo 中 存在 SPI 接口 org.apache.dubbo.rpc.cluster.Directory。即服務目錄,用于存放服務提供列表。本文部分內(nèi)容參考官方文章:
1. Directory 概念
關(guān)于 Directory 官方的描述如下:
Directory 即服務目錄, 服務目錄中存儲了一些和服務提供者有關(guān)的信息,通過服務目錄,服務消費者可獲取到服務提供者的信息,比如 ip、端口、服務協(xié)議等。通過這些信息,服務消費者就可通過 Netty 等客戶端進行遠程調(diào)用。在一個服務集群中,服務提供者數(shù)量并不是一成不變的,如果集群中新增了一臺機器,相應地在服務目錄中就要新增一條服務提供者記錄。或者,如果服務提供者的配置修改了,服務目錄中的記錄也要做相應的更新。如果這樣說,服務目錄和注冊中心的功能不就雷同了嗎?確實如此,這里這么說是為了方便大家理解。實際上服務目錄在獲取注冊中心的服務配置信息后,會為每條配置信息生成一個 Invoker 對象,并把這個 Invoker 對象存儲起來,這個 Invoker 才是服務目錄最終持有的對象。Invoker 有什么用呢?看名字就知道了,這是一個具有遠程調(diào)用功能的對象。講到這大家應該知道了什么是服務目錄了,它可以看做是 Invoker 集合,且這個集合中的元素會隨注冊中心的變化而進行動態(tài)調(diào)整。
簡單來說 :Directory 中保存了當前可以提供服務的服務提供者列表集合。當消費者進行服務調(diào)用時,會從 Directory 中按照某些規(guī)則挑選出一個服務提供者來提供服務。
二、Directory 的種類
服務目錄目前內(nèi)置的實現(xiàn)有兩個,分別為 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類。AbstractDirectory 實現(xiàn)了 Directory 接口,這個接口包含了一個重要的方法定義,即 list(Invocation),用于列舉 Invoker。下面我們來看一下他們的繼承體系圖。
如上,Directory 繼承自 Node 接口,Node 這個接口繼承者比較多,像 Registry、Monitor、Invoker 等均繼承了這個接口。這個接口包含了一個獲取配置信息的方法 Node#getUrl,實現(xiàn)該接口的類可以向外提供配置信息。除此之外, RegistryDirectory 實現(xiàn)了 NotifyListener 接口,當注冊中心節(jié)點信息發(fā)生變化后,RegistryDirectory 可以通過此接口方法得到變更信息,并根據(jù)變更信息動態(tài)調(diào)整內(nèi)部 Invoker 列表。詳細實現(xiàn)我們下面會進行分析。
1. AbstractDirectory
StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類。所以我們這里先來看看 AbstractDirectory 中的公共邏輯。
AbstractDirectory 封裝了 Invoker 列舉流程,具體的列舉邏輯則由子類實現(xiàn),這是典型的模板模式。AbstractDirectory 的整個實現(xiàn)很簡單 :
public abstract class AbstractDirectory<T> implements Directory<T> {// loggerprivate static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);// 當前 注冊中心URL 或者 直連URLprivate final URL url;private volatile boolean destroyed = false;// 消費者URLprivate volatile URL consumerUrl;protected RouterChain<T> routerChain;public AbstractDirectory(URL url) {this(url, null);}public AbstractDirectory(URL url, RouterChain<T> routerChain) {this(url, url, routerChain);}public AbstractDirectory(URL url, URL consumerUrl, RouterChain<T> routerChain) {if (url == null) {throw new IllegalArgumentException("url == null");}// 如果是注冊中心的協(xié)議,則進行進一步解析if (url.getProtocol().equals(Constants.REGISTRY_PROTOCOL)) {Map<String, String> queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));this.url = url.addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);} else {this.url = url;}this.consumerUrl = consumerUrl;setRouterChain(routerChain);}// 根據(jù)調(diào)用信息獲取到服務提供者列表@Overridepublic List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException("Directory already destroyed .url: " + getUrl());}// 這里直接交由子類實現(xiàn),也就是 StaticDirectory 和 RegistryDirectory 的實現(xiàn)return doList(invocation);}@Overridepublic URL getUrl() {return url;}public RouterChain<T> getRouterChain() {return routerChain;}public void setRouterChain(RouterChain<T> routerChain) {this.routerChain = routerChain;}protected void addRouters(List<Router> routers) {routers = routers == null ? Collections.emptyList() : routers;routerChain.addRouters(routers);}public URL getConsumerUrl() {return consumerUrl;}public void setConsumerUrl(URL consumerUrl) {this.consumerUrl = consumerUrl;}public boolean isDestroyed() {return destroyed;}@Overridepublic void destroy() {destroyed = true;}protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;}這里需要關(guān)注的就是 AbstractDirectory#list 方法。當消費者進行服務調(diào)用時,Dubbo 會將調(diào)用相關(guān)信息封裝成Invocation進行調(diào)用。在調(diào)用過程中會通過 AbstractDirectory#doList 來獲取服務提供者列表。而 AbstractDirectory#doList的具體實現(xiàn)則交由子類 StaticDirectory 和 RegistryDirectory 來實現(xiàn)。
2. StaticDirectory
StaticDirectory 即靜態(tài)服務目錄,顧名思義,它內(nèi)部存放的 Invoker 是不會變動的。所以,理論上它和不可變 List 的功能很相似。下面我們來看一下這個類的實現(xiàn)。
public class StaticDirectory<T> extends AbstractDirectory<T> {private static final Logger logger = LoggerFactory.getLogger(StaticDirectory.class);// Invoker 列表,在 StaticDirectory 構(gòu)造時就已經(jīng)初始化private final List<Invoker<T>> invokers;public StaticDirectory(List<Invoker<T>> invokers) {this(null, invokers, null);}public StaticDirectory(List<Invoker<T>> invokers, RouterChain<T> routerChain) {this(null, invokers, routerChain);}public StaticDirectory(URL url, List<Invoker<T>> invokers) {this(url, invokers, null);}public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) {super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routerChain);if (invokers == null || invokers.isEmpty())throw new IllegalArgumentException("invokers == null");this.invokers = invokers;}// 服務提供者的接口@Overridepublic Class<T> getInterface() {return invokers.get(0).getInterface();}// 檢測服務目錄是否可用@Overridepublic boolean isAvailable() {if (isDestroyed()) {return false;}// 只要有一個 Invoker 是可用的,就認為當前目錄是可用的for (Invoker<T> invoker : invokers) {if (invoker.isAvailable()) {return true;}}return false;}@Overridepublic void destroy() {if (isDestroyed()) {return;}// 調(diào)用父類銷毀邏輯super.destroy();// 遍歷 Invoker 列表,并執(zhí)行相應的銷毀邏輯for (Invoker<T> invoker : invokers) {invoker.destroy();}invokers.clear();}// 構(gòu)建路由鏈public void buildRouterChain() {RouterChain<T> routerChain = RouterChain.buildChain(getUrl());routerChain.setInvokers(invokers);this.setRouterChain(routerChain);}// 通過路由鏈路來進行路由,獲取最終的 Invoker 列表@Overrideprotected List<Invoker<T>> doList(Invocation invocation) throws RpcException {List<Invoker<T>> finalInvokers = invokers;if (routerChain != null) {try {// 進行服務路由,篩選出滿足路由規(guī)則的服務列表。finalInvokers = routerChain.route(getConsumerUrl(), invocation);} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}return finalInvokers == null ? Collections.emptyList() : finalInvokers;}}StaticDirectory 的實現(xiàn)并不復雜,因為其是不可變的服務列表,相較于RegistryDirectory 較少了動態(tài)監(jiān)聽的功能。
3. RegistryDirectory
RegistryDirectory 是一種動態(tài)服務目錄,實現(xiàn)了 NotifyListener 接口,也是我們最常使用的Directory。當注冊中心服務配置發(fā)生變化后,RegistryDirectory 可收到與當前服務相關(guān)的變化。收到變更通知后,RegistryDirectory 可根據(jù)配置變更信息刷新 Invoker 列表。
RegistryDirectory 中有幾個比較重要的邏輯:
下面我們來看上面三個功能的實現(xiàn):
3.1. Invoker 的列舉邏輯
RegistryDirectory#doList 的實現(xiàn)如下:
@Overridepublic List<Invoker<T>> doList(Invocation invocation) {// 校測是否可用:沒有服務提供者 || 提供者被禁用時會拋出異常if (forbidden) {// ... 拋出異常}// 如果group設置的是可以匹配多個組,則直接返回當前的 Invoker 集合if (multiGroup) {return this.invokers == null ? Collections.emptyList() : this.invokers;}// 否則通過 路由鏈路進行路由List<Invoker<T>> invokers = null;try {// Get invokers from cache, only runtime routers will be executed.invokers = routerChain.route(getConsumerUrl(), invocation);} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}return invokers == null ? Collections.emptyList() : invokers;}其中 RouterChain#route 的實現(xiàn)如下:
public List<Invoker<T>> route(URL url, Invocation invocation) {List<Invoker<T>> finalInvokers = invokers;for (Router router : routers) {// 這里將 finalInvokers 交由路由按照規(guī)則進行過濾,返回最終過濾后的 finalInvokers。finalInvokers = router.route(finalInvokers, url, invocation);}return finalInvokers;}這里可以看到整個服務列舉的過程很簡單:將本地緩存的服務列表交由服務路由 router 進行篩選,將篩選后的服務列表返回。
我們這里需要注意的是,當消費者調(diào)用多分組的服務時,路由規(guī)則會失效。
關(guān)于路由規(guī)則,詳參:Dubbo筆記? :Dubbo集群組件 之 Router
3.2 接收服務配置變更的邏輯
RegistryDirectory 是一個動態(tài)服務目錄,會隨注冊中心配置的變化進行動態(tài)調(diào)整。因此 RegistryDirectory 實現(xiàn)了 NotifyListener 接口,通過這個接口獲取注冊中心變更通知。
即服務消費者在啟動時,會訂閱 注冊中心 providers、configurators、routers 節(jié)點,并設置回調(diào)函數(shù)為 RegistryDirectory#notify,當節(jié)點更新時,會調(diào)用 RegistryDirectory#notify 方法來進行本地的更新(在啟動時會立刻調(diào)用一次該回調(diào)方法,用于同步當前節(jié)點配置)。
// 在調(diào)用該方法之前,會調(diào)用 AbstractRegistry#notify 中將URL 按照類別劃分,再分別調(diào)用 RegistryDirectory#notify 方法。@Overridepublic synchronized void notify(List<URL> urls) {// 對 URLs 進行合法性過濾List<URL> categoryUrls = urls.stream()// 合法性組別校驗,默認 providers.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.toList());/*** TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?*/// 篩選出配置信息URL 并轉(zhuǎn)換成 configurators this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator)).orElse(configurators);// 篩選出路由URL 并轉(zhuǎn)換成Router 添加到 AbstractDirectory#routerChain 中// RouterChain保存了服務提供者的URL列表轉(zhuǎn)換為invoker列表和可用服務提供者對應的invokers列表和路由規(guī)則信息toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);// providers// 篩選出 提供者URL 并進行服務提供者的更新refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));}簡述邏輯:RegistryDirectory 在接收到服務配置變化后,會按照類型進行劃分(configurators、routers,providers) ,并分別進行處理。
3.3 Invoker 列表的刷新邏輯
在上面 RegistryDirectory#notify 中,對URL進行了分組后分別進行處理,對于providers 節(jié)點的刷新,即服務提供者的列表刷新是比較重要的部分,其實現(xiàn)在RegistryDirectory#refreshOverrideAndInvoker中 :
private void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return null// 重寫URL(也就是把mock=return null等信息拼接到URL中)并保存到overrideDirectoryUrl中overrideDirectoryUrl();// 刷新 服務提供者 URL,根據(jù)URL 生成InvokerrefreshInvoker(urls);}// 刷新 服務列表private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");// 如果只有一個 協(xié)議為 empty 的url,則表明需要銷毀所有協(xié)議,因為empty 協(xié)議為空協(xié)議,個人理解就是為了防止空url存在而生成的無意義的urlif (invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {// 設置禁止訪問this.forbidden = true; // Forbid to access// invoker設置為空集合this.invokers = Collections.emptyList();routerChain.setInvokers(this.invokers);destroyAllInvokers(); // Close all invokers} else {// 設置允許訪問this.forbidden = false; // Allow to access// urlInvokerMap 需要使用本地引用,因為 urlInvokerMap自身可能隨時變化,可能指向 nullMap<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local referenceif (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// 這里防止并發(fā)更新,cachedInvokerUrls 被 volatile 修飾if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls = new HashSet<>();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison}if (invokerUrls.isEmpty()) {return;}// 將 url 轉(zhuǎn)換成 Invoker,key為 url.toFullString(), value 為 InvokerMap<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map// state change// If the calculation is wrong, it is not processed.if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}// 轉(zhuǎn)化為不可修改 list,防止并發(fā)修改List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));// pre-route and build cache, notice that route cache should build on original Invoker list.// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.// 保存服務提供者 InvokerrouterChain.setInvokers(newInvokers);// 如果匹配多個 group,則進行合并this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;this.urlInvokerMap = newUrlInvokerMap;try {// 銷毀無用的 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}}}上面可以看到整個過程還是比較簡單:
3.3.1 RegistryDirectory#toInvokers
可以看到上面的關(guān)鍵的邏輯 在于 URL 轉(zhuǎn)換為 Invoker ,即如下,
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);toInvokers 實現(xiàn)如下:
// org.apache.dubbo.registry.integration.RegistryDirectory#toInvokersprivate Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}Set<String> keys = new HashSet<String>();String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);for (URL providerUrl : urls) {// If protocol is configured at the reference side, only the matching protocol is selected // 檢測服務提供者協(xié)議是否被服務消費者所支持// 如果消費者指定了了調(diào)用服務的協(xié)議,則只使用指定的協(xié)議,即如果消費者指定提供者協(xié)議類型為 dubbo,則只會需要協(xié)議類型為dubbo的提供者。if (queryProtocols != null && queryProtocols.length() > 0) {boolean accept = false;String[] acceptProtocols = queryProtocols.split(",");for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {accept = true;break;}}if (!accept) {// 若服務提供者協(xié)議頭不被消費者所支持,則忽略當前 providerUrlcontinue;}}// 跳過 空協(xié)議if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}// 沒有能夠處理該協(xié)議的 Protocol實現(xiàn)類,則跳過if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {// .... 日志打印continue;}// 合并提供者url 順序是 : override > -D >Consumer > ProviderURL url = mergeUrl(providerUrl);String key = url.toFullString(); // The parameter urls are sorted// 避免重復解析if (keys.contains(key)) { // Repeated urlcontinue;}keys.add(key);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again// 從緩存中獲取 InvokerMap<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference// 獲取與 url 對應的 InvokerInvoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);// 如果緩存中不存在當前URL對應的invoker,則進行遠程引用if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;// 對 disabled 和 enabled 參數(shù)的校驗,即服務是否啟用if (url.hasParameter(Constants.DISABLED_KEY)) {enabled = !url.getParameter(Constants.DISABLED_KEY, false);} else {enabled = url.getParameter(Constants.ENABLED_KEY, true);}if (enabled) {// 調(diào)用 生成invoker委托類invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}// 將 Invoker 保存到 緩存 newUrlInvokerMap 中if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(key, invoker);}} else {newUrlInvokerMap.put(key, invoker);}}keys.clear();// 返回 url 轉(zhuǎn)換成的 Invoker Map return newUrlInvokerMap;}可以看到整個邏輯
如果消費者指定了服務提供者的協(xié)議類型,則按照指定協(xié)議來獲取URL
如果第一步獲取成功,或者沒有指定協(xié)議類型,則會對URL進行合并、緩存校驗等過程
如果緩存中不存在當前URL的Invoker ,則會通過下面這一句來創(chuàng)建Invoker。
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);這里我們可以看到關(guān)鍵的 邏輯在 protocol.refer(serviceType, url),由于當前服務的協(xié)議是Dubbo,所以這里的調(diào)用順序應為:
refprotocol.refer =》 XxxProtocolWrapper#refer => DubboProtocol#refer3.3.2 DubboProtocol#refer
DubboProtocol#refer 根據(jù)URL 創(chuàng)建了 Invoker,在這里會建立與服務提供者的網(wǎng)絡連接。其具體實現(xiàn)如下:
@Overridepublic <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {// 序列化優(yōu)化optimizeSerialization(url);// create rpc invoker.// 創(chuàng)建DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}...private ExchangeClient[] getClients(URL url) {// whether to share connection// 是否共享連接boolean service_share_connect = false;// 獲取連接數(shù),默認為0,表示未配置int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);// if not configured, connection is shared, otherwise, one connection for one service// 如果未配置 connections,則共享連接if (connections == 0) {service_share_connect = true;connections = 1;}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (service_share_connect) {// 獲取共享客戶端 : getSharedClient 中會從緩存中獲取,如果沒有命中,則會調(diào)用 initClient 方法創(chuàng)建客戶端clients[i] = getSharedClient(url);} else {// 初始化新的客戶端clients[i] = initClient(url);}}return clients;}這里需要注意的是在創(chuàng)建連接的過程中, 由于一臺機器可以提供多個服務,那么消費者在引用這些服務時會考慮是與這些服務建立一個共享連接,還是與每一個服務單獨建立一個連接。這里可以通過 connections 設置數(shù)量來決定創(chuàng)建多少客戶端連接,默認是共享同一個客戶端。
下面我們看一下 DubboProtocol#getSharedClient 方法的實現(xiàn):
/*** Get shared connection* 獲取共享客戶端*/private ExchangeClient getSharedClient(URL url) {String key = url.getAddress();// 獲取帶有“引用計數(shù)”功能的 ExchangeClientReferenceCountExchangeClient client = referenceClientMap.get(key);if (client != null) {if (!client.isClosed()) {// 增加引用計數(shù)client.incrementAndGetCount();return client;} else {referenceClientMap.remove(key);}}locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {if (referenceClientMap.containsKey(key)) {return referenceClientMap.get(key);}// 如果緩存沒命中,則創(chuàng)建 ExchangeClient 客戶端ExchangeClient exchangeClient = initClient(url);// 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這里使用了裝飾模式client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);referenceClientMap.put(key, client);ghostClientMap.remove(key);locks.remove(key);return client;}}/*** Create new connection* 創(chuàng)建一個新的連接*/private ExchangeClient initClient(URL url) {// client type setting.// 從url獲取客戶端類型,默認為 nettyString str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));// 添加編解碼和心跳包參數(shù)到 url 中url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);// enable heartbeat by defaulturl = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// BIO is not allowed since it has severe performance issue.// 檢測客戶端類型是否存在,不存在則拋出異常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}ExchangeClient client;try {// connection should be lazy// 獲取 lazy 配置,并根據(jù)配置值決定創(chuàng)建的客戶端類型if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {// 創(chuàng)建懶加載 ExchangeClient 實例client = new LazyConnectExchangeClient(url, requestHandler);} else {// 創(chuàng)建普通 ExchangeClient 實例client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;}這里我們可以發(fā)現(xiàn),無論是共享連接還是非共享連接,都會調(diào)用 DubboProtocol#initClient 方法來創(chuàng)建客戶端。這里需要注意在創(chuàng)建客戶端的時候,可以根據(jù) LAZY_CONNECT_KEY (默認為 lazy) 配置來決定是否懶加載客戶端,如果懶加載客戶端,則會在第一次調(diào)用服務時才會創(chuàng)建與服務端的連接(也是調(diào)用 Exchangers.connect 方法),如果不是懶加載,則在服務啟動的時候便會與提供者建立連接。默認是立即加載,即消費者在啟動時就會與提供者建立連接。
同時我們可以注意到 在非懶加載的情況下,當URl 轉(zhuǎn)換成 Invoker 時,消費者便已經(jīng)和提供者建立了鏈接(通過 Exchangers.connect(url, requestHandler) ),也即是說,默認情況下消費者在啟動的時候?qū)⑺刑峁┱遀RL轉(zhuǎn)化為Invoker,即代表消費者啟動時便已經(jīng)和所有的提供者建立了連接。
關(guān)于 Exchangers.connect 的過程,并非本文重點,其目的即建立了與服務提供者的Netty連接。詳參:Dubbo筆記⑩ : 消費者啟動流程 - DubboProtocol#refer
五、Directory 的調(diào)用過程
經(jīng)過上面的介紹,我們對 Directory 的功能了解的差不多,下面我們看看 Directory 的調(diào)用過程。根據(jù) Directory 的功能我們就可以推測出其調(diào)用是在消費端。
1. Directory 的創(chuàng)建
當消費者服務啟動時會通過 ReferenceConfig#createProxy 創(chuàng)建提供者的代理類。而在 ReferenceConfig#createProxy 方法中,完成了 Directory 的創(chuàng)建過程。
ReferenceConfig#createProxy 中會根據(jù)注冊中心的數(shù)量或直連URL的數(shù)量,如果為單一URL,則創(chuàng)建 RegistryDirectory ,否則創(chuàng)建 StaticDirectory(也會創(chuàng)建 RegistryDirectory )。 如下圖:
1.1 RegistryProtocol的創(chuàng)建
上面講了,無論是多URL還是單URL都會執(zhí)行 RegistryProtocol#refer,而在 RegistryProtocol#refer方法中,創(chuàng)建了 RegistryDirectory。即是說,無論是單URL 還是多 URL 都會調(diào)用 RegistryProtocol#refer 創(chuàng)建RegistryDirectory。關(guān)于RegistryProtocol#refer 的分析詳參 Dubbo筆記⑨ : 消費者啟動流程 - RegistryProtocol#refer
1.2 StaticDirectory 的創(chuàng)建
在多注冊中心或者多服務提供者時會創(chuàng)建StaticDirectory 作為服務目錄。創(chuàng)建過程在ReferenceConfig#createProxy中,部分代碼如下,其中 refprotocol.refer(interfaceClass, url) 即1.1 RegistryProtocol#refer 中的 RegistryProtocol#refer 方法
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // use last registry url}}if (registryURL != null) { // registry url is available// use RegistryAwareCluster only when register's cluster is availableURL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invokerinvoker = cluster.join(new StaticDirectory(u, invokers));} else { // not a registry url, must be direct invoke.invoker = cluster.join(new StaticDirectory(invokers));}另外當消費端引用多個分組的服務時,dubbo會對每個分組創(chuàng)建一個對應的StaticDirectory對象。這一部分是在RegistryDirectory#toMergeInvokerList 中完成。
2. Diectory 的回調(diào)
Diectory 接收到服務列表變化會通過回調(diào)方法通知自己。而由于StaticDirectory是不可變列表,所以不存在回調(diào)邏輯。而 RegistryProtocol 的回調(diào)邏輯即是上面 3.2 接收服務配置變更的邏輯 章節(jié)講述的內(nèi)容。
本文只介紹了 Diectory 相關(guān)內(nèi)容,未對整個消費者流程進行系統(tǒng)梳理,故流程上并非完全連貫,如果需要了解整個過程詳參:
Dubbo筆記⑧ : 消費者啟動流程 - ReferenceConfig#get
Dubbo筆記⑨ : 消費者啟動流程 - RegistryProtocol#refer
以上:內(nèi)容部分參考
《深度剖析Apache Dubbo 核心技術(shù)內(nèi)幕》
https://dubbo.apache.org/zh/docs/v2.7/dev/source/
https://dubbo.apache.org/zh/docs/v2.7/dev/source/directory/
https://blog.csdn.net/weixin_38308374/article/details/106736721
如有侵擾,聯(lián)系刪除。 內(nèi)容僅用于自我記錄學習使用。如有錯誤,歡迎指正
總結(jié)
以上是生活随笔為你收集整理的Dubbo笔记 ⑭ :Dubbo集群组件 之 Directory的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flutter的菜鸟教程
- 下一篇: 摄像头poe供电原理_带你简单了解一下什