Dubbo 源码分析 - 服务导出
1.服務(wù)導(dǎo)出過程
本篇文章,我們來研究一下 Dubbo 導(dǎo)出服務(wù)的過程。Dubbo 服務(wù)導(dǎo)出過程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后,會立即執(zhí)行服務(wù)導(dǎo)出邏輯。整個邏輯大致可分為三個部分,第一是前置工作,主要用于檢查參數(shù),組裝 URL。第二是導(dǎo)出服務(wù),包含導(dǎo)出服務(wù)到本地 (JVM),和導(dǎo)出服務(wù)到遠(yuǎn)程兩個過程。第三是向注冊中心注冊服務(wù),用于服務(wù)發(fā)現(xiàn)。本篇文章將會對這三個部分代碼進行詳細(xì)的分析,在分析之前,我們先來了解一下服務(wù)的導(dǎo)出過程。
Dubbo 支持兩種服務(wù)導(dǎo)出方式,分別延遲導(dǎo)出和立即導(dǎo)出。延遲導(dǎo)出的入口是 ServiceBean 的 afterPropertiesSet 方法,立即導(dǎo)出的入口是 ServiceBean 的 onApplicationEvent 方法。本文打算分析服務(wù)延遲導(dǎo)出過程,因此不會分析 afterPropertiesSet 方法。下面從 onApplicationEvent 方法說起,該方法收到 Spring 容器的刷新事件后,會調(diào)用 export 方法執(zhí)行服務(wù)導(dǎo)出操作。服務(wù)導(dǎo)出之前,要進行對一系列的配置進行檢查,以及生成 URL。準(zhǔn)備工作做完,隨后開始導(dǎo)出服務(wù)。首先導(dǎo)出到本地,然后再導(dǎo)出到遠(yuǎn)程。導(dǎo)出到本地就是將服務(wù)導(dǎo)出到 JVM 中,此過程比較簡單。導(dǎo)出到遠(yuǎn)程的過程則要復(fù)雜的多,以 dubbo 協(xié)議為例,DubboProtocol 類的 export 方法將會被調(diào)用。該方法主要用于創(chuàng)建 Exporter 和 ExchangeServer。ExchangeServer 本身并不具備通信能力,需要借助更底層的 Server 實現(xiàn)通信功能。因此,在創(chuàng)建 ExchangeServer 實例時,需要先創(chuàng)建 NettyServer 或者 MinaServer 實例,并將實例作為參數(shù)傳給 ExchangeServer 實現(xiàn)類的構(gòu)造方法。ExchangeServer 實例創(chuàng)建完成后,導(dǎo)出服務(wù)到遠(yuǎn)程的過程也就接近尾聲了。服務(wù)導(dǎo)出結(jié)束后,服務(wù)消費者即可通過直聯(lián)的方式消費服務(wù)。當(dāng)然,一般我們不會使用直聯(lián)的方式消費服務(wù)。所以,在服務(wù)導(dǎo)出結(jié)束后,緊接著要做的事情是向注冊中心注冊服務(wù)。此時,客戶端即可從注冊中心發(fā)現(xiàn)服務(wù)。
以上就是 Dubbo 服務(wù)導(dǎo)出的過程,比較復(fù)雜。下面開始分析源碼,從源碼的角度展現(xiàn)整個過程。
?2.源碼分析
一場 Dubbo 源碼分析的馬拉松比賽即將開始,現(xiàn)在我們站在賽道的起點進行熱身準(zhǔn)備。本次比賽的起點位置位于 ServiceBean 的 onApplicationEvent 方法處。好了,發(fā)令槍響了,我將和一些朋友從 onApplicationEvent 方法處出發(fā),探索 Dubbo 服務(wù)導(dǎo)出的全過程。下面我們來看一下 onApplicationEvent 方法的源碼。
| 1 2 3 4 5 6 7 | public void onApplicationEvent(ContextRefreshedEvent event) {// 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出if (isDelay() && !isExported() && !isUnexported()) {// 導(dǎo)出服務(wù)export();} } |
onApplicationEvent 是一個事件響應(yīng)方法,該方法會在收到 Spring 上下文刷新事件后執(zhí)行。這個方法首先會根據(jù)條件決定是否導(dǎo)出服務(wù),比如有些服務(wù)設(shè)置了延時導(dǎo)出,那么此時就不應(yīng)該在此處導(dǎo)出。還有一些服務(wù)已經(jīng)被導(dǎo)出了,或者當(dāng)前服務(wù)被取消導(dǎo)出了,此時也不能再次導(dǎo)出相關(guān)服務(wù)。注意這里的 isDelay 方法,這個方法字面意思是“是否延遲導(dǎo)出服務(wù)”,返回 true 表示延遲導(dǎo)出,false 表示不延遲導(dǎo)出。但是該方法真實意思卻并非如此,當(dāng)方法返回 true 時,表示無需延遲導(dǎo)出。返回 false 時,表示需要延遲導(dǎo)出。與字面意思恰恰相反,讓人覺得很奇怪。下面我們來看一下這個方法的邏輯。
| 1 2 3 4 5 6 7 8 9 10 11 12 | // -☆- ServiceBean private boolean isDelay() {// 獲取 delayInteger delay = getDelay();ProviderConfig provider = getProvider();if (delay == null && provider != null) {// 如果前面獲取的 delay 為空,這里繼續(xù)獲取delay = provider.getDelay();}// 判斷 delay 是否為空,或者等于 -1return supportedApplicationListener && (delay == null || delay == -1); } |
暫時忽略 supportedApplicationListener 這個條件,當(dāng) delay 為空,或者等于-1時,該方法返回 true,而不是 false。這個方法的返回值讓人有點困惑,因此我重構(gòu)了該方法的代碼,并給 Dubbo 提了一個 Pull Request,最終這個 PR 被合到了 Dubbo 主分支中。詳細(xì)請參見?Dubbo #2686。
現(xiàn)在解釋一下 supportedApplicationListener 變量含義,該變量用于表示當(dāng)前的 Spring 容器是否支持 ApplicationListener,這個值初始為 false。在 Spring 容器將自己設(shè)置到 ServiceBean 中時,ServiceBean 的 setApplicationContext 方法會檢測 Spring 容器是否支持 ApplicationListener。若支持,則將 supportedApplicationListener 置為 true。代碼就不分析了,大家自行查閱了解。
ServiceBean 是 Dubbo 與 Spring 框架進行整合的關(guān)鍵,可以看做是兩個框架之間的橋梁。具有同樣作用的類還有 ReferenceBean。ServiceBean 實現(xiàn)了 Spring 的一些拓展接口,有 FactoryBean、ApplicationContextAware、ApplicationListener、DisposableBean 和 BeanNameAware。這些接口我在?Spring 源碼分析系列文章中介紹過,大家可以參考一下,這里就不贅述了。
現(xiàn)在我們知道了 Dubbo 服務(wù)導(dǎo)出過程的起點。那么接下來,我們快馬加鞭,繼續(xù)進行比賽。賽程預(yù)告,下一站是“服務(wù)導(dǎo)出的前置工作”。
?2.1 前置工作
前置工作主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導(dǎo)出服務(wù)之前,Dubbo 需要檢查用戶的配置是否合理,或者為用戶補充缺省配置。配置檢查完成后,接下來需要根據(jù)這些配置組裝 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作為配置載體,所有的拓展點都是通過 URL 獲取配置。這一點,官方文檔中有所說明。
采用 URL 作為配置信息的統(tǒng)一格式,所有擴展點都通過傳遞 URL 攜帶配置信息。
接下來,我們先來分析配置檢查部分的源碼,隨后再來分析 URL 組裝部分的源碼。
?2.1.1 檢查配置
本節(jié)我們接著前面的源碼向下分析,前面說過 onApplicationEvent 方法在經(jīng)過一些判斷后,會決定是否調(diào)用 export 方法導(dǎo)出服務(wù)。那么下面我們從 export 方法開始進行分析,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public synchronized void export() {if (provider != null) {// 獲取 export 和 delay 配置if (export == null) {export = provider.getExport();}if (delay == null) {delay = provider.getDelay();}}// 如果 export 為 false,則不導(dǎo)出服務(wù)if (export != null && !export) {return;}if (delay != null && delay > 0) { // delay > 0,延時導(dǎo)出服務(wù)delayExportExecutor.schedule(new Runnable() {@Overridepublic void run() {doExport();}}, delay, TimeUnit.MILLISECONDS);} else { // 立即導(dǎo)出服務(wù)doExport();} } |
export 對兩個配置進行了檢查,并配置執(zhí)行相應(yīng)的動作。首先是 export,這個配置決定了是否導(dǎo)出服務(wù)。有時候我們只是想本地啟動服務(wù)進行一些調(diào)試工作,這個時候我們并不希望把本地啟動的服務(wù)暴露出去給別人調(diào)用。此時,我們就可以通過配置 export 禁止服務(wù)導(dǎo)出,比如:
| 1 | <dubbo:provider export="false" /> |
delay 見名知意了,用于延遲導(dǎo)出服務(wù)。下面,我們繼續(xù)分析源碼,這次要分析的是 doExport 方法。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("Already unexported!");}if (exported) {return;}exported = true;// 檢測 interfaceName 是否合法if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("interface not allow null!");}// 檢測 provider 是否為空,為空則新建一個,并通過系統(tǒng)變量為其初始化checkDefault();// 下面幾個 if 語句用于檢測 provider、application 等核心配置類對象是否為空,// 若為空,則嘗試從其他配置類對象中獲取相應(yīng)的實例。if (provider != null) {if (application == null) {application = provider.getApplication();}if (module == null) {module = provider.getModule();}if (registries == null) {...}if (monitor == null) {...}if (protocols == null) {...}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {...}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {...}}// 檢測 ref 是否泛化服務(wù)類型if (ref instanceof GenericService) {// 設(shè)置 interfaceClass 為 GenericService.classinterfaceClass = GenericService.class;if (StringUtils.isEmpty(generic)) {// 設(shè)置 generic = "true"generic = Boolean.TRUE.toString();}} else { // ref 非 GenericService 類型try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 對 interfaceClass,以及 <dubbo:method> 必要字段進行檢查checkInterfaceAndMethods(interfaceClass, methods);// 對 ref 合法性進行檢測checkRef();// 設(shè)置 generic = "false"generic = Boolean.FALSE.toString();}// local 屬性 Dubbo 官方文檔中沒有說明,不過 local 和 stub 在功能應(yīng)該是一致的,用于配置本地存根if (local != null) {if ("true".equals(local)) {local = interfaceName + "Local";}Class<?> localClass;try {// 獲取本地存根類localClass = ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 檢測本地存根類是否可賦值給接口類,若不可賦值則會拋出異常,提醒使用者本地存根類類型不合法if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);}}// stub 和 local 均用于配置本地存根if (stub != null) {// 此處的代碼和上一個 if 分支的代碼基本一致,這里省略了}// 檢測各種對象是否為空,為空則新建,或者拋出異常checkApplication();checkRegistry();checkProtocol();appendProperties(this);checkStubAndMock(interfaceClass);if (path == null || path.length() == 0) {path = interfaceName;}// 導(dǎo)出服務(wù)doExportUrls();// ProviderModel 表示服務(wù)提供者模型,此對象中存儲了和服務(wù)提供者相關(guān)的信息。// 比如服務(wù)的配置信息,服務(wù)實例等。每個被導(dǎo)出的服務(wù)對應(yīng)一個 ProviderModel。// ApplicationModel 持有所有的 ProviderModel。ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); } |
以上就是配置檢查的相關(guān)分析,代碼比較多,需要大家耐心看一下。下面對配置檢查的邏輯進行簡單的總結(jié),如下:
配置檢查并非本文重點,因此我不打算對 doExport 方法所調(diào)用的方法進行分析(doExportUrls 方法除外)。在這些方法中,除了 appendProperties 方法稍微復(fù)雜一些,其他方法都還好。因此,大家可自行進行分析。好了,其他的就不多說了,繼續(xù)向下分析。
?2.1.2 多協(xié)議多注冊中心導(dǎo)出服務(wù)
Dubbo 允許我們使用不同的協(xié)議導(dǎo)出服務(wù),也允許我們向多個注冊中心注冊服務(wù)。Dubbo 在 doExportUrls 方法中對多協(xié)議,多注冊中心進行了支持。相關(guān)代碼如下:
| 1 2 3 4 5 6 7 8 | private void doExportUrls() {// 加載注冊中心鏈接List<URL> registryURLs = loadRegistries(true);// 遍歷 protocols,導(dǎo)出每個服務(wù)for (ProtocolConfig protocolConfig : protocols) {doExportUrlsFor1Protocol(protocolConfig, registryURLs);} } |
上面代碼比較簡單,首先是通過 loadRegistries 加載注冊中心鏈接,然后再遍歷 ProtocolConfig 集合導(dǎo)出每個服務(wù)。并在導(dǎo)出服務(wù)的過程中,將服務(wù)注冊到注冊中心處。下面,我們先來看一下 loadRegistries 方法的邏輯。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | protected List<URL> loadRegistries(boolean provider) {// 檢測是否存在注冊中心配置類,不存在則拋出異常checkRegistry();List<URL> registryList = new ArrayList<URL>();if (registries != null && !registries.isEmpty()) {for (RegistryConfig config : registries) {String address = config.getAddress();if (address == null || address.length() == 0) {// 若 address 為空,則將其設(shè)為 0.0.0.0address = Constants.ANYHOST_VALUE;}// 從系統(tǒng)屬性中加載注冊中心地址String sysaddress = System.getProperty("dubbo.registry.address");if (sysaddress != null && sysaddress.length() > 0) {address = sysaddress;}// 判斷 address 是否合法if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {Map<String, String> map = new HashMap<String, String>();// 添加 ApplicationConfig 中的字段信息到 map 中appendParameters(map, application);// 添加 RegistryConfig 字段信息到 map 中appendParameters(map, config);map.put("path", RegistryService.class.getName());map.put("dubbo", Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!map.containsKey("protocol")) {if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {map.put("protocol", "remote");} else {map.put("protocol", "dubbo");}}// 解析得到 URL 列表,address 可能包含多個注冊中心 ip,// 因此解析得到的是一個 URL 列表List<URL> urls = UrlUtils.parseURLs(address, map);for (URL url : urls) {url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());// 將 URL 協(xié)議頭設(shè)置為 registryurl = url.setProtocol(Constants.REGISTRY_PROTOCOL);// 通過判斷條件,決定是否添加 url 到 registryList 中,條件如下:// (服務(wù)提供者 && register = true 或 null) // || (非服務(wù)提供者 && subscribe = true 或 null)if ((provider && url.getParameter(Constants.REGISTER_KEY, true))|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {registryList.add(url);}}}}}return registryList; } |
上面代碼不是很復(fù)雜,包含如下邏輯:
關(guān)于多協(xié)議多注冊中心導(dǎo)出服務(wù)就先分析到這,代碼不是很多,就不過多敘述了。接下來分析 URL 組裝過程。
?2.1.3 組裝 URL
配置檢查完畢后,緊接著要做的事情是根據(jù)配置,以及其他一些信息組裝 URL。前面說過,URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞。URL 之于 Dubbo,猶如水之于魚,非常重要。大家在閱讀 Dubbo 服務(wù)導(dǎo)出相關(guān)源碼的過程中,要注意 URL 內(nèi)容的變化。既然 URL 如此重要,那么下面我們來了解一下 URL 組裝的過程。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {String name = protocolConfig.getName();// 如果協(xié)議名為空,或空串,則將協(xié)議名變量設(shè)置為 dubboif (name == null || name.length() == 0) {name = "dubbo";}Map<String, String> map = new HashMap<String, String>();// 添加 side、版本、時間戳以及進程號等信息到 map 中map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}// 通過反射將對象的字段信息到 map 中appendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);// methods 為 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標(biāo)簽的配置信息if (methods != null && !methods.isEmpty()) {// 這段代碼用于添加 Callback 配置到 map 中,代碼太長,待會單獨分析}// 檢測 generic 是否為 "true",并根據(jù)檢測結(jié)果向 map 中添加不同的信息if (ProtocolUtils.isGeneric(generic)) {map.put(Constants.GENERIC_KEY, generic);map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}// 為接口生成包裹類 Wrapper,Wrapper 中包含了接口的詳細(xì)信息,比如接口方法名數(shù)組,字段信息等String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();// 添加方法名到 map 中,如果包含多個方法名,則用逗號隔開,比如 method = init,destroyif (methods.length == 0) {logger.warn("NO method found in service interface ...");map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {// 將逗號作為分隔符連接方法名,并將連接后的字符串放入 map 中map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}// 添加 token 到 map 中if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}// 判斷協(xié)議名是否為 injvmif (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put("notify", "false");}// 獲取上下文路徑String contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0) && provider != null) {contextPath = provider.getContextpath();}// 獲取 host 和 portString host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);// 組裝 URLURL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);// 省略無關(guān)代碼 } |
上面的代碼首先是將一些信息,比如版本、時間戳、方法名以及各種配置對象的字段信息放入到 map 中,map 中的內(nèi)容將作為 URL 的查詢字符串。構(gòu)建好 map 后,緊接著是獲取上下文路徑、主機名以及端口號等信息。最后將 map 和主機名等數(shù)據(jù)傳給 URL 構(gòu)造方法創(chuàng)建 URL 對象。需要注意的是,這里出現(xiàn)的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。
上面省略了一段代碼,這里簡單分析一下。這段代碼用于檢測 <dubbo:argument> 標(biāo)簽中的配置信息,并將相關(guān)配置添加到 map 中。代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {// ...// methods 為 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標(biāo)簽的配置信息if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {// 添加 MethodConfig 對象的字段信息到 map 中,鍵 = 方法名.屬性名。// 比如存儲 <dubbo:method name="sayHello" retries="2"> 對應(yīng)的 MethodConfig,// 鍵 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);// 檢測 MethodConfig retry 是否為 false,若是,則設(shè)置重試次數(shù)為0if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}// 獲取 ArgumentConfig 列表List<ArgumentConfig> arguments = method.getArguments();if (arguments != null && !arguments.isEmpty()) {for (ArgumentConfig argument : arguments) {// 檢測 type 屬性是否為空,或者空串(分支1 ??)if (argument.getType() != null && argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();if (methods != null && methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// 比對方法名,查找目標(biāo)方法if (methodName.equals(method.getName())) {Class<?>[] argtypes = methods[i].getParameterTypes();if (argument.getIndex() != -1) {// 檢測 ArgumentConfig 中的 type 屬性與方法參數(shù)列表// 中的參數(shù)名稱是否一致,不一致則拋出異常(分支2 ??)if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {// 添加 ArgumentConfig 字段信息到 map 中,// 鍵前綴 = 方法名.index,比如:// map = {"sayHello.3": true}appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config error: ...");}} else { // 分支3 ??for (int j = 0; j < argtypes.length; j++) {Class<?> argclazz = argtypes[j];// 從參數(shù)類型列表中查找類型名稱為 argument.type 的參數(shù)if (argclazz.getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() + "." + j);if (argument.getIndex() != -1 && argument.getIndex() != j) {throw new IllegalArgumentException("argument config error: ...");}}}}}}}// 用戶未配置 type 屬性,但配置了 index 屬性,且 index != -1} else if (argument.getIndex() != -1) { // 分支4 ??// 添加 ArgumentConfig 字段信息到 map 中appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config must set index or type");}}}}}// ... } |
上面這段代碼 for 循環(huán)和 if else 分支嵌套太多,導(dǎo)致層次太深,不利于閱讀,需要耐心看一下。大家在看這段代碼時,注意把幾個重要的條件分支找出來。只要理解了這幾個分支的意圖,就可以弄懂這段代碼。我在上面代碼中用??符號標(biāo)識出了4個重要的分支,下面用偽代碼解釋一下這幾個分支的含義。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | // 獲取 ArgumentConfig 列表 for (遍歷 ArgumentConfig 列表) {if (type 不為 null,也不為空串) { // 分支11. 通過反射獲取 interfaceClass 的方法列表for (遍歷方法列表) {1. 比對方法名,查找目標(biāo)方法2. 通過反射獲取目標(biāo)方法的參數(shù)類型數(shù)組 argtypesif (index != -1) { // 分支21. 從 argtypes 數(shù)組中獲取下標(biāo) index 處的元素 argType2. 檢測 argType 的名稱與 ArgumentConfig 中的 type 屬性是否一致3. 添加 ArgumentConfig 字段信息到 map 中,或拋出異常} else { // 分支31. 遍歷參數(shù)類型數(shù)組 argtypes,查找 argument.type 類型的參數(shù)2. 添加 ArgumentConfig 字段信息到 map 中}}} else if (index != -1) { // 分支41. 添加 ArgumentConfig 字段信息到 map 中} } |
在本節(jié)分析的源碼中,appendParameters 這個方法出現(xiàn)的次數(shù)比較多,該方法用于將對象字段信息添加到 map 中。實現(xiàn)上則是通過反射獲取目標(biāo)對象的 getter 方法,并調(diào)用該方法獲取屬性值。然后再通過 getter 方法名解析出屬性名,比如從方法名 getName 中可解析出屬性 name。如果用戶傳入了屬性名前綴,此時需要將屬性名加入前綴內(nèi)容。最后將 <屬性名,屬性值> 鍵值對存入到 map 中就行了。限于篇幅原因,這里就不分析 appendParameters 方法的源碼了,大家請自行分析。
?2.2 導(dǎo)出 Dubbo 服務(wù)
前置工作做完,接下來就可以進行服務(wù)導(dǎo)出工作。服務(wù)導(dǎo)出,分為導(dǎo)出到本地 (JVM),和導(dǎo)出到遠(yuǎn)程。在深入分析服務(wù)導(dǎo)出源碼前,我們先來從宏觀層面上看一下服務(wù)導(dǎo)出邏輯。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {// 省略無關(guān)代碼if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {// 加載 ConfiguratorFactory,并生成 Configurator 配置 urlurl = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}String scope = url.getParameter(Constants.SCOPE_KEY);// 如果 scope = none,則什么都不做if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// scope != remote,導(dǎo)出到本地if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);}// scope != local,導(dǎo)出到遠(yuǎn)程if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (registryURLs != null && !registryURLs.isEmpty()) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));// 加載監(jiān)視器鏈接URL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {// 將監(jiān)視器鏈接作為參數(shù)添加到 url 中url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}String proxy = url.getParameter(Constants.PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);}// 為服務(wù)提供類(ref)生成 InvokerInvoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));// DelegateProviderMetaDataInvoker 僅用于持有 Invoker 和 ServiceConfigDelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 導(dǎo)出服務(wù),并生成 ExporterExporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}} else { // 不存在注冊中心,僅導(dǎo)出服務(wù)Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}}}this.urls.add(url); } |
上面代碼根據(jù) url 中的 scope 參數(shù)決定服務(wù)導(dǎo)出方式,分別如下:
- scope = none,不導(dǎo)出服務(wù)
- scope != remote,導(dǎo)出到本地
- scope != local,導(dǎo)出到遠(yuǎn)程
不管是導(dǎo)出到本地,還是遠(yuǎn)程。進行服務(wù)導(dǎo)出之前,均需要先創(chuàng)建 Invoker。這是一個很重要的步驟,因此接下來我會先分析 Invoker 的創(chuàng)建過程。
?2.2.1 Invoker 創(chuàng)建過程
在 Dubbo 中,Invoker 是一個非常重要的模型。在服務(wù)提供端,以及服務(wù)引用端均會出現(xiàn) Invoker。Dubbo 官方文檔中對 Invoker 進行了說明,這里引用一下。
Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用,它有可能是一個本地的實現(xiàn),也可能是一個遠(yuǎn)程的實現(xiàn),也可能一個集群實現(xiàn)。
既然 Invoker 如此重要,那么我們很有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 創(chuàng)建而來,Dubbo 默認(rèn)的 ProxyFactory 實現(xiàn)類是 JavassistProxyFactory。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創(chuàng)建過程。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | -- JavassistProxyFactory public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// 為目標(biāo)類創(chuàng)建 Wrapperfinal Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);// 創(chuàng)建匿名 Invoker 類對象,并實現(xiàn) doInvoke 方法。return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {// 調(diào)用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調(diào)用目標(biāo)方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}}; } |
如上,JavassistProxyFactory 創(chuàng)建了一個繼承自 AbstractProxyInvoker 類的匿名對象,并覆寫了抽象方法 doInvoke。覆寫后的 doInvoke 邏輯比較簡單,僅是將調(diào)用請求轉(zhuǎn)發(fā)給了 Wrapper 類的 invokeMethod 方法。Wrapper 用于“包裹”目標(biāo)類,Wrapper 是一個抽象類,僅可通過 getWrapper(Class) 方法創(chuàng)建子類。在創(chuàng)建 Wrapper 子類的過程中,子類代碼生成邏輯會對 getWrapper 方法傳入的 Class 對象進行解析,拿到諸如類方法,類成員變量等信息。以及生成 invokeMethod 方法代碼,和其他一些方法代碼。代碼生成完畢后,通過 Javassist 生成 Class 對象,最后再通過反射創(chuàng)建 Wrapper 實例。相關(guān)的代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public static Wrapper getWrapper(Class<?> c) {while (ClassGenerator.isDynamicClass(c))c = c.getSuperclass();if (c == Object.class)return OBJECT_WRAPPER;// 訪存Wrapper ret = WRAPPER_MAP.get(c);if (ret == null) {// 緩存未命中,創(chuàng)建 Wrapperret = makeWrapper(c);// 寫入緩存WRAPPER_MAP.put(c, ret);}return ret; } |
getWrapper 方法只是包含了一些緩存操作邏輯,非重點。下面我們重點關(guān)注 makeWrapper 方法。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | private static Wrapper makeWrapper(Class<?> c) {// 檢測 c 是否為私有類型,若是則拋出異常if (c.isPrimitive())throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);String name = c.getName();ClassLoader cl = ClassHelper.getClassLoader(c);// c1 用于存儲 setPropertyValue 方法代碼StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");// c2 用于存儲 getPropertyValue 方法代碼StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");// c3 用于存儲 invokeMethod 方法代碼StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");// 生成類型轉(zhuǎn)換代碼及異常捕捉代碼,比如:// DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");// pts 用于存儲成員變量名和類型Map<String, Class<?>> pts = new HashMap<String, Class<?>>();// ms 用于存儲方法描述信息(可理解為方法簽名)及 Method 實例Map<String, Method> ms = new LinkedHashMap<String, Method>();// mns 為方法名列表List<String> mns = new ArrayList<String>();// dmns 用于存儲定義在當(dāng)前類中的方法的名稱List<String> dmns = new ArrayList<String>();// --------------------------------? 分割線1 ?-------------------------------------// 獲取 public 訪問級別的字段,并為所有字段生成條件判斷語句for (Field f : c.getFields()) {String fn = f.getName();Class<?> ft = f.getType();if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))// 忽略關(guān)鍵字 static 或 transient 修飾的變量continue;// 生成條件判斷及賦值語句,比如:// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");// 生成條件判斷及返回語句,比如:// if( $2.equals("name") ) { return ($w)w.name; }c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");// 存儲 <字段名, 字段類型> 鍵值對到 pts 中pts.put(fn, ft);}// --------------------------------? 分割線2 ?-------------------------------------Method[] methods = c.getMethods();// 檢測 c 中是否包含在當(dāng)前類中聲明的方法boolean hasMethod = hasMethods(methods);if (hasMethod) {c3.append(" try{");}for (Method m : methods) {if (m.getDeclaringClass() == Object.class)// 忽略 Object 中定義的方法continue;String mn = m.getName();// 生成方法名判斷語句,示例如下:// if ( "sayHello".equals( $2 )c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");int len = m.getParameterTypes().length;// 生成運行時傳入?yún)?shù)的數(shù)量與方法的參數(shù)列表長度判斷語句,示例如下:// && $3.length == 2c3.append(" && ").append(" $3.length == ").append(len);boolean override = false;for (Method m2 : methods) {// 檢測方法是否存在重載情況,條件為:方法對象不同 && 方法名相同if (m != m2 && m.getName().equals(m2.getName())) {override = true;break;}}// 對重載方法進行處理,考慮下面的方法:// 1. void sayHello(Integer, String)// 2. void sayHello(Integer, Integer)// 方法名相同,參數(shù)列表長度也相同,因此不能僅通過這兩項判斷兩個方法是否相等。// 需要進一步判斷方法的參數(shù)類型if (override) {if (len > 0) {for (int l = 0; l < len; l++) {// && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"").append(m.getParameterTypes()[l].getName()).append("\")");}}}// 添加 ) {,完成方法判斷語句,此時生成的方法可能如下(已格式化):// if ("sayHello".equals($2) // && $3.length == 2// && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) {c3.append(" ) { ");// 根據(jù)返回值類型生成目標(biāo)方法調(diào)用語句if (m.getReturnType() == Void.TYPE)// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");else// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");// 添加 }, 當(dāng)前”方法判斷條件“代碼生成完畢,示例代碼如下(已格式化):// if ("sayHello".equals($2) // && $3.length == 2// && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) {//// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); // return null;// }c3.append(" }");// 添加方法名到 mns 集合中mns.add(mn);// 檢測當(dāng)前方法是否在 c 中被聲明的if (m.getDeclaringClass() == c)// 若是,則將當(dāng)前方法名添加到 dmns 中dmns.add(mn);ms.put(ReflectUtils.getDesc(m), m);}if (hasMethod) {// 添加異常捕捉語句c3.append(" } catch(Throwable e) { ");c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");c3.append(" }");}// 添加 NoSuchMethodException 異常拋出代碼c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");// --------------------------------? 分割線3 ?-------------------------------------Matcher matcher;// 處理 get/set 方法for (Map.Entry<String, Method> entry : ms.entrySet()) {String md = entry.getKey();Method method = (Method) entry.getValue();// 匹配以 get 開頭的方法if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {// 獲取屬性名String pn = propertyName(matcher.group(1));// 生成屬性判斷以及返回語句,示例如下:// if( $2.equals("name") ) { return ($w).w.getName(); }c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());// 匹配以 is/has/can 開頭的方法} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));// 生成屬性判斷以及返回語句,示例如下:// if( $2.equals("dream") ) { return ($w).w.hasDream(); }c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());// 匹配以 set 開頭的方法} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {Class<?> pt = method.getParameterTypes()[0];String pn = propertyName(matcher.group(1));// 生成屬性判斷以及 setter 調(diào)用語句,示例如下:// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");pts.put(pn, pt);}}// 添加 NoSuchPropertyException 異常拋出代碼c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");// --------------------------------? 分割線4 ?-------------------------------------long id = WRAPPER_CLASS_COUNTER.getAndIncrement();// 創(chuàng)建類生成器ClassGenerator cc = ClassGenerator.newInstance(cl);// 設(shè)置類名及超類cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);cc.setSuperClass(Wrapper.class);// 添加默認(rèn)構(gòu)造方法cc.addDefaultConstructor();// 添加字段cc.addField("public static String[] pns;");cc.addField("public static " + Map.class.getName() + " pts;");cc.addField("public static String[] mns;");cc.addField("public static String[] dmns;");for (int i = 0, len = ms.size(); i < len; i++)cc.addField("public static Class[] mts" + i + ";");// 添加方法代碼cc.addMethod("public String[] getPropertyNames(){ return pns; }");cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");cc.addMethod("public String[] getMethodNames(){ return mns; }");cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");cc.addMethod(c1.toString());cc.addMethod(c2.toString());cc.addMethod(c3.toString());try {// 生成類Class<?> wc = cc.toClass();// 設(shè)置字段值wc.getField("pts").set(null, pts);wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));wc.getField("mns").set(null, mns.toArray(new String[0]));wc.getField("dmns").set(null, dmns.toArray(new String[0]));int ix = 0;for (Method m : ms.values())wc.getField("mts" + ix++).set(null, m.getParameterTypes());// 創(chuàng)建 Wrapper 實例return (Wrapper) wc.newInstance();} catch (RuntimeException e) {throw e;} catch (Throwable e) {throw new RuntimeException(e.getMessage(), e);} finally {cc.release();ms.clear();mns.clear();dmns.clear();} } |
上面代碼很長,大家耐心看一下。我在上面代碼中做了大量的注釋,并按功能對代碼進行了分塊,以幫助大家理解代碼邏輯。下面對這段代碼進行講解。首先我們把目光移到分割線1之上的代碼,這段代碼主要用于進行一些初始化操作。比如創(chuàng)建 c1、c2、c3 以及 pts、ms、mns 等變量,以及向 c1、c2、c3 中添加方法定義和類型類型轉(zhuǎn)換代碼。接下來是分割線1到分割線2之間的代碼,這段代碼用于為 public 級別的字段生成條件判斷取值與賦值代碼。這段代碼不是很難看懂,就不多說了。繼續(xù)向下看,分割線2和分隔線3之間的代碼用于為定義在當(dāng)前類中的方法生成判斷語句,和方法調(diào)用語句。因為需要對方法重載進行校驗,因此到這這段代碼看起來有點復(fù)雜。不過耐心開一下,也不是很難理解。接下來是分割線3和分隔線4之間的代碼,這段代碼用于處理 getter、setter 以及以 is/has/can 開頭的方法。處理方式是通過正則表達(dá)式獲取方法類型(get/set/is/…),以及屬性名。之后為屬性名生成判斷語句,然后為方法生成調(diào)用語句。最后我們再來看一下分隔線4以下的代碼,這段代碼通過 ClassGenerator 為剛剛生成的代碼構(gòu)建 Class 類,并通過反射創(chuàng)建對象。ClassGenerator 是 Dubbo 自己封裝的,該類的核心是 toClass() 的重載方法 toClass(ClassLoader, ProtectionDomain),該方法通過 javassist 構(gòu)建 Class。這里就不分析 toClass 方法了,大家請自行分析。
閱讀 Wrapper 類代碼需要對 javassist 框架有所了解。關(guān)于 javassist,大家如果不熟悉,請自行查閱資料,本節(jié)不打算介紹 javassist 相關(guān)內(nèi)容。
好了,關(guān)于 Wrapper 類生成過程就分析到這。如果大家看的不是很明白,可以單獨為 Wrapper 創(chuàng)建單元測試,然后單步調(diào)試。并將生成的代碼拷貝出來,格式化后再進行觀察和理解。好了,本節(jié)先到這。
?2.2.2 導(dǎo)出服務(wù)到本地
本節(jié)我們來看一下服務(wù)導(dǎo)出相關(guān)的代碼,按照代碼執(zhí)行順序,本節(jié)先來分析導(dǎo)出服務(wù)到本地的過程。相關(guān)代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | private void exportLocal(URL url) {// 如果 URL 的協(xié)議頭等于 injvm,說明已經(jīng)導(dǎo)出到本地了,無需再次導(dǎo)出if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL) // 設(shè)置協(xié)議頭為 injvm.setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));// 創(chuàng)建 Invoker,并導(dǎo)出服務(wù),這里的 protocol 會在運行時調(diào)用 InjvmProtocol 的 export 方法Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);} } |
exportLocal 方法比較簡單,首先根據(jù) URL 協(xié)議頭決定是否導(dǎo)出服務(wù)。若需導(dǎo)出,則創(chuàng)建一個新的 URL 并將協(xié)議頭、主機名以及端口設(shè)置成新的值。然后創(chuàng)建 Invoker,并調(diào)用 InjvmProtocol 的 export 方法導(dǎo)出服務(wù)。下面我們來看一下 InjvmProtocol 的 export 方法都做了哪些事情。
| 1 2 3 4 | public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {// 創(chuàng)建 InjvmExporterreturn new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); } |
如上,InjvmProtocol 的 export 方法僅創(chuàng)建了一個 InjvmExporter,無其他邏輯。到此導(dǎo)出服務(wù)到本地就分析完了,接下來,我們繼續(xù)分析導(dǎo)出服務(wù)到遠(yuǎn)程的過程。
?2.2.3 導(dǎo)出服務(wù)到遠(yuǎn)程
與導(dǎo)出服務(wù)到本地相比,導(dǎo)出服務(wù)到遠(yuǎn)程的過程要復(fù)雜不少,其包含了服務(wù)導(dǎo)出與服務(wù)注冊兩個過程。這兩個過程涉及到了大量的調(diào)用,因此比較復(fù)雜。不過不管再難,我們都要看一下,萬一看懂了呢。按照代碼執(zhí)行順序,本節(jié)先來分析服務(wù)導(dǎo)出邏輯,服務(wù)注冊邏輯將在下一節(jié)進行分析。下面開始分析,我們把目光移動到 RegistryProtocol 的 export 方法上。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {// 導(dǎo)出服務(wù)final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);// 獲取注冊中心 URL,以 zookeeper 注冊中心為例,得到的示例 URL 如下:// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-providerURL registryUrl = getRegistryUrl(originInvoker);// 根據(jù) URL 加載 Registry 實現(xiàn)類,比如 ZookeeperRegistryfinal Registry registry = getRegistry(originInvoker);// 獲取已注冊的服務(wù)提供者 URL,比如:// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHellofinal URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);// 獲取 register 參數(shù)boolean register = registeredProviderUrl.getParameter("register", true);// 向服務(wù)提供者與消費者注冊表中注冊服務(wù)提供者ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);// 根據(jù) register 的值決定是否注冊服務(wù)if (register) {// 向注冊中心注冊服務(wù)register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}// 獲取訂閱 URL,比如:// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHellofinal URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);// 創(chuàng)建監(jiān)聽器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 向注冊中心進行訂閱 override 數(shù)據(jù)registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 創(chuàng)建并返回 DestroyableExporterreturn new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); } |
上面代碼看起來比較復(fù)雜,主要做如下一些操作:
在以上操作中,除了創(chuàng)建并返回 DestroyableExporter 沒啥難度外,其他幾步操作都不是很簡單。這其中,導(dǎo)出服務(wù)和注冊服務(wù)是本章要重點分析的邏輯。 訂閱 override 數(shù)據(jù)這個是非重點內(nèi)容,后面會簡單介紹一下。下面開始本節(jié)的分析,先來分析 doLocalExport 方法的邏輯,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {String key = getCacheKey(originInvoker);// 訪問緩存ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {// 創(chuàng)建 Invoker 為委托類對象final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));// 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);// 寫緩存bounds.put(key, exporter);}}}return exporter; } |
上面的代碼是典型的雙重檢查,這個大家應(yīng)該都知道。接下來,我們把重點放在 Protocol 的 export 方法上。假設(shè)運行時協(xié)議為 dubbo,此處的 protocol 會在運行時加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法。我們目光轉(zhuǎn)移到 DubboProtocol 的 export 方法上,相關(guān)分析如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// 獲取服務(wù)標(biāo)識,理解成服務(wù)坐標(biāo)也行。由服務(wù)組名,服務(wù)名,服務(wù)版本號以及端口組成。比如:// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880String key = serviceKey(url);// 創(chuàng)建 DubboExporterDubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);// 將 <key, exporter> 鍵值對放入緩存中exporterMap.put(key, exporter);// 以下代碼應(yīng)該和本地存根有關(guān),代碼不難看懂,但具體用途暫時不清楚,先忽略Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {// 省略日志打印代碼} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}// 啟動服務(wù)器openServer(url);// 優(yōu)化序列化optimizeSerialization(url);return exporter; } |
如上,我們重點關(guān)注 DubboExporter 的創(chuàng)建以及 openServer 方法,其他邏輯看不懂也沒關(guān)系,不影響理解服務(wù)導(dǎo)出過程。另外,DubboExporter 的代碼比較簡單,就不分析了。下面分析 openServer 方法。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private void openServer(URL url) {// 獲取 host:port,并將其作為服務(wù)器實例的 key,用于標(biāo)識當(dāng)前的服務(wù)器實例String key = url.getAddress();boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) {// 訪問緩存ExchangeServer server = serverMap.get(key);if (server == null) {// 創(chuàng)建服務(wù)器實例serverMap.put(key, createServer(url));} else {// 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器server.reset(url);}} } |
如上,在同一臺機器上(單網(wǎng)卡),同一個端口上僅允許啟動一個服務(wù)器實例。若某個端口上已有服務(wù)器實例,此時則調(diào)用 reset 方法重置服務(wù)器的一些配置。考慮到篇幅問題,關(guān)于服務(wù)器實例重置的代碼就不分析了。接下來分析服務(wù)器實例的創(chuàng)建過程。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | private ExchangeServer createServer(URL url) {url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,// 添加心跳檢測配置到 url 中url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// 獲取 server 參數(shù),默認(rèn)為 nettyString str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);// 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException("Unsupported server type: " + str + ", url: " + url);// 添加編碼解碼器參數(shù)url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {// 創(chuàng)建 ExchangeServerserver = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server...");}// 獲取 client 參數(shù),可指定 netty,minastr = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {// 獲取所有的 Transporter 實現(xiàn)類名稱集合,比如 supportedTypes = [netty, mina]Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();// 檢測當(dāng)前 Dubbo 所支持的 Transporter 實現(xiàn)類名稱列表中,// 是否包含 client 所表示的 Transporter,若不包含,則拋出異常if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type...");}}return server; } |
如上,createServer 包含三個核心的操作。第一是檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實例。第三是檢測是否支持 client 參數(shù)所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測操作所對應(yīng)的代碼比較直白了,無需多說。但創(chuàng)建服務(wù)器的操作目前還不是很清晰,我們繼續(xù)往下看。
| 1 2 3 4 5 6 7 8 9 10 11 12 | public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 獲取 Exchanger,默認(rèn)為 HeaderExchanger。// 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實例return getExchanger(url).bind(url, handler); } |
上面代碼比較簡單,就不多說了。下面看一下 HeaderExchanger 的 bind 方法。
| 1 2 3 4 5 6 7 | public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 創(chuàng)建 HeaderExchangeServer 實例,該方法包含了多步操作,本別如下:// 1. new HeaderExchangeHandler(handler)// 2. new DecodeHandler(new HeaderExchangeHandler(handler))// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } |
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關(guān)心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handlers 元素數(shù)量大于1,則創(chuàng)建 ChannelHandler 分發(fā)器handler = new ChannelHandlerDispatcher(handlers);}// 獲取自適應(yīng) Transporter 實例,并調(diào)用實例方法return getTransporter().bind(url, handler); } |
如上,getTransporter() 方法獲取的 Transporter 是在運行時動態(tài)創(chuàng)建的,類名為 TransporterAdaptive,也就是自適應(yīng)拓展類。我在[上一篇文章](http://www.tianxiaobo.com/2018/10/13/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-%E8%87%AA%E9%80%82%E5%BA%94%E6%8B%93%E5%B1%95%E5%8E%9F%E7%90%86/)中詳細(xì)分析了自適應(yīng)拓展類的生成過程,對自適應(yīng)拓展類不了解的同學(xué)可以參考我之前的文章,這里不再贅述。TransporterAdaptive 會在運行時根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter,默認(rèn)為 NettyTransporter。下面我們繼續(xù)跟下去,這次分析的是 NettyTransporter 的 bind 方法。
| 1 2 3 4 | public Server bind(URL url, ChannelHandler listener) throws RemotingException {// 創(chuàng)建 NettyServerreturn new NettyServer(url, listener); } |
這里僅有一句創(chuàng)建 NettyServer 的代碼,沒啥好講的,我們繼續(xù)向下看。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public class NettyServer extends AbstractServer implements Server {public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// 調(diào)用父類構(gòu)造方法super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));} }public abstract class AbstractServer extends AbstractEndpoint implements Server {public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {// 調(diào)用父類構(gòu)造方法,這里就不用跟進去了,沒什么復(fù)雜邏輯super(url, handler);localAddress = getUrl().toInetSocketAddress();// 獲取 ip 和端口String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {// 設(shè)置 ip 為 0.0.0.0bindIp = NetUtils.ANYHOST;}bindAddress = new InetSocketAddress(bindIp, bindPort);// 獲取最大可接受連接數(shù)this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {// 調(diào)用模板方法 doOpen 啟動服務(wù)器doOpen();} catch (Throwable t) {throw new RemotingException("Failed to bind ");}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}protected abstract void doOpen() throws Throwable;protected abstract void doClose() throws Throwable; } |
上面多數(shù)代碼為賦值代碼,不需要多講。我們重點關(guān)注 doOpen 抽象方法,該方法需要子類實現(xiàn)。下面回到 NettyServer 中。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();// 創(chuàng)建 boss 和 worker 線程池ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));// 創(chuàng)建 ServerBootstrapbootstrap = new ServerBootstrap(channelFactory);final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();bootstrap.setOption("child.tcpNoDelay", true);// 設(shè)置 PipelineFactorybootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// 綁定到指定的 ip 和端口上channel = bootstrap.bind(getBindAddress()); } |
以上就是 NettyServer 創(chuàng)建的過程,dubbo 默認(rèn)使用的 NettyServer 是基于 netty 3.x 版本實現(xiàn)的,比較老了。因此 Dubbo 中另外提供了 netty 4.x 版本的 NettyServer,大家可在使用 Dubbo 的過程中按需進行配置。
到此,關(guān)于服務(wù)導(dǎo)出的過程就分析完了。整個過程比較復(fù)雜,大家在分析的過程中耐心一些。并且多寫 Demo 進行進行調(diào)試,以便能夠更好的理解代碼邏輯。好了,本節(jié)內(nèi)容先到這里,接下來分析服務(wù)導(dǎo)出的另一塊邏輯 – 服務(wù)注冊。
?2.2.4 服務(wù)注冊
本節(jié)我們來分析服務(wù)注冊過程,服務(wù)注冊操作對于 Dubbo 來說不是必需的,通過服務(wù)直連的方式就可以繞過注冊中心。但通常我們不會這么做,直連方式不利于服務(wù)治理,僅推薦在測試環(huán)境測試服務(wù)時使用。對于 Dubbo 來說,注冊中心雖不是必需,但卻是必要的。因此,關(guān)于注冊中心以及服務(wù)注冊相關(guān)邏輯,我們也需要搞懂。
本節(jié)內(nèi)容以 Zookeeper 注冊中心作為分析目標(biāo),其他類型注冊中心大家可自行分析。下面從服務(wù)注冊的入口方法開始分析,我們把目光再次移到 RegistryProtocol 的 export 方法上。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {// ${導(dǎo)出服務(wù)}// 省略其他代碼boolean register = registeredProviderUrl.getParameter("register", true);if (register) {// 注冊服務(wù)register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 訂閱 override 數(shù)據(jù)registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 省略部分代碼 } |
RegistryProtocol 的 export 方法包含了服務(wù)導(dǎo)出,注冊,以及數(shù)據(jù)訂閱等邏輯。其中服務(wù)導(dǎo)出邏輯上一節(jié)已經(jīng)分析過了,本節(jié)將分析服務(wù)注冊邏輯,數(shù)據(jù)訂閱邏輯將在下一節(jié)進行分析。下面開始本節(jié)的分析,相關(guān)代碼如下:
| 1 2 3 4 5 6 | public void register(URL registryUrl, URL registedProviderUrl) {// 獲取 RegistryRegistry registry = registryFactory.getRegistry(registryUrl);// 注冊服務(wù)registry.register(registedProviderUrl); } |
register 方法包含兩步操作,第一步是獲取注冊中心實例,第二步是向注冊中心注冊服務(wù)。接下來,我分兩節(jié)內(nèi)容對這兩步操作進行分析。按照順序,先來分析獲取注冊中心的邏輯。
?2.2.4.1 創(chuàng)建注冊中心
本節(jié)內(nèi)容以 Zookeeper 注冊中心為例進行分析。下面先來看一下 getRegistry 方法的源碼,這個方法由 AbstractRegistryFactory 實現(xiàn)。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public Registry getRegistry(URL url) {url = url.setPath(RegistryService.class.getName()).addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);String key = url.toServiceString();LOCK.lock();try {// 訪問緩存Registry registry = REGISTRIES.get(key);if (registry != null) {return registry;}// 緩存未命中,創(chuàng)建 Registry 實例registry = createRegistry(url);if (registry == null) {throw new IllegalStateException("Can not create registry...");}// 寫入緩存REGISTRIES.put(key, registry);return registry;} finally {LOCK.unlock();} }protected abstract Registry createRegistry(URL url); |
如上,getRegistry 方法先訪問緩存,緩存未命中則調(diào)用 createRegistry 創(chuàng)建 Registry,然后寫入緩存。這里的 createRegistry 是一個模板方法,由具體的子類實現(xiàn)。因此,下面我們到 ZookeeperRegistryFactory 中探究一番。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class ZookeeperRegistryFactory extends AbstractRegistryFactory {// zookeeperTransporter 由 SPI 在運行時注入,類型為 ZookeeperTransporter$Adaptiveprivate ZookeeperTransporter zookeeperTransporter;public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {this.zookeeperTransporter = zookeeperTransporter;}@Overridepublic Registry createRegistry(URL url) {// 創(chuàng)建 ZookeeperRegistryreturn new ZookeeperRegistry(url, zookeeperTransporter);} } |
ZookeeperRegistryFactory 的 createRegistry 方法僅包含一句代碼,無需解釋,繼續(xù)跟下去。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {super(url);if (url.isAnyHost()) {throw new IllegalStateException("registry address == null");}// 獲取組名,默認(rèn)為 dubboString group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);if (!group.startsWith(Constants.PATH_SEPARATOR)) {// group = "/" + groupgroup = Constants.PATH_SEPARATOR + group;}this.root = group;// 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporterzkClient = zookeeperTransporter.connect(url);// 添加狀態(tài)監(jiān)聽器zkClient.addStateListener(new StateListener() {@Overridepublic void stateChanged(int state) {if (state == RECONNECTED) {try {recover();} catch (Exception e) {logger.error(e.getMessage(), e);}}}}); } |
在上面的代碼代碼中,我們重點關(guān)注 ZookeeperTransporter 的 connect 方法調(diào)用,這個方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端,意味著注冊中心的創(chuàng)建過程就結(jié)束了。不過,顯然我們不能就此停止,難道大家沒有興趣了解一下 Zookeeper 客戶端的創(chuàng)建過程嗎?如果有,那么繼續(xù)向下看。沒有的話,直接跳到下一節(jié)。那我接著分析了。
前面說過,這里的 zookeeperTransporter 類型為自適應(yīng)拓展類,因此 connect 方法會在被調(diào)用時決定加載什么類型的 ZookeeperTransporter 拓展,默認(rèn)為 CuratorZookeeperTransporter。下面我們到 CuratorZookeeperTransporter 中看一看。
| 1 2 3 4 | public ZookeeperClient connect(URL url) {// 創(chuàng)建 CuratorZookeeperClientreturn new CuratorZookeeperClient(url); } |
上面方法僅用于創(chuàng)建 CuratorZookeeperClient 實例,沒什么好說的,繼續(xù)往下看。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {private final CuratorFramework client;public CuratorZookeeperClient(URL url) {super(url);try {// 創(chuàng)建 CuratorFramework 構(gòu)造器CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000);String authority = url.getAuthority();if (authority != null && authority.length() > 0) {builder = builder.authorization("digest", authority.getBytes());}// 構(gòu)建 CuratorFramework 實例client = builder.build();// 添加監(jiān)聽器client.getConnectionStateListenable().addListener(new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {if (state == ConnectionState.LOST) {CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);} else if (state == ConnectionState.CONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);} else if (state == ConnectionState.RECONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);}}});// 啟動客戶端client.start();} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}} } |
CuratorZookeeperClient 構(gòu)造方法主要用于創(chuàng)建和啟動 CuratorFramework 實例。以上基本上都是 Curator 框架的代碼,大家如果對 Curator 框架不是很了解,可以參考 Curator 官方文檔,并寫點 Demo 跑跑。
本節(jié)分析了 ZookeeperRegistry 實例的創(chuàng)建過程,整個過程并不是很復(fù)雜。大家在看完分析后,可以自行調(diào)試,以加深印象。現(xiàn)在注冊中心實例創(chuàng)建好了,接下來要做的事情是向注冊中心注冊服務(wù),我們繼續(xù)往下看。
?2.2.4.2 節(jié)點創(chuàng)建
以 Zookeeper 為例,所謂的服務(wù)注冊,本質(zhì)上是將服務(wù)配置數(shù)據(jù)寫入到 Zookeeper 的某個路徑的節(jié)點下。為了驗證這個說法,下面我們將 Dobbo 官方提供提供的實例跑起來,然后通過 Zookeeper 可視化客戶端?ZooInspector?查看節(jié)點數(shù)據(jù)。如下:
從上圖中可以看到 com.alibaba.dubbo.demo.DemoService 這個服務(wù)對應(yīng)的配置信息(存儲在 URL 中)最終被注冊到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節(jié)點下。搞懂了服務(wù)注冊的本質(zhì),那么接下來我們就可以去閱讀服務(wù)注冊的代碼了。服務(wù)注冊的接口為 register(URL),這個方法定義在 FailbackRegistry 抽象類中。方法代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public void register(URL url) {super.register(url);failedRegistered.remove(url);failedUnregistered.remove(url);try {// 模板方法,由子類實現(xiàn)doRegister(url);} catch (Exception e) {Throwable t = e;// 獲取 check 參數(shù),若 check = true 將會直接拋出異常boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to register");} else {logger.error("Failed to register");}// 記錄注冊失敗的鏈接failedRegistered.add(url);} }protected abstract void doRegister(URL url); |
如上,我們重點關(guān)注 doRegister 方法調(diào)用即可,其他的代碼先忽略。doRegister 方法是一個模板方法,因此我們到 FailbackRegistry 子類 ZookeeperRegistry 中進行分析。如下:
| 1 2 3 4 5 6 7 8 9 10 11 | protected void doRegister(URL url) {try {// 通過 Zookeeper 客戶端創(chuàng)建節(jié)點,節(jié)點路徑由 toUrlPath 方法生成,路徑格式如下:// /${group}/${serviceInterface}/providers/${url}// 比如// /dubbo/com.tianxiaobo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register...");} } |
如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點。節(jié)點路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public void create(String path, boolean ephemeral) {if (!ephemeral) {// 如果要創(chuàng)建的節(jié)點類型非臨時節(jié)點,那么這里要檢測節(jié)點是否存在if (checkExists(path)) {return;}}int i = path.lastIndexOf('/');if (i > 0) {create(path.substring(0, i), false); // 遞歸創(chuàng)建上一級路徑}// 根據(jù) ephemeral 的值創(chuàng)建臨時或持久節(jié)點if (ephemeral) {createEphemeral(path);} else {createPersistent(path);} } |
上面方法先是通過遞歸創(chuàng)建當(dāng)前節(jié)點的上一級路徑,然后再根據(jù) ephemeral 的值決定創(chuàng)建臨時還是持久節(jié)點。createEphemeral 和 createPersistent 這兩個方法都比較簡單,這里簡單分析其中的一個。如下:
| 1 2 3 4 5 6 7 8 9 | public void createEphemeral(String path) {try {// 通過 Curator 框架創(chuàng)建節(jié)點client.create().withMode(CreateMode.EPHEMERAL).forPath(path);} catch (NodeExistsException e) {} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} } |
好了,到此關(guān)于服務(wù)注冊的過程就分析完了。整個過程可簡單總結(jié)為:先創(chuàng)建注冊中心實例,之后再通過注冊中心實例注冊服務(wù)。本節(jié)先到這,接下來分析數(shù)據(jù)訂閱過程。
?2.2.5 訂閱 override 數(shù)據(jù)
訂閱 override 數(shù)據(jù)對應(yīng)的代碼我粗略看了一遍,這部分代碼的主要目的是為了在服務(wù)配置發(fā)生變化時,重新導(dǎo)出服務(wù)。具體的使用場景應(yīng)該當(dāng)我們通過 Dubbo 管理后臺修改了服務(wù)配置后,Dubbo 得到服務(wù)配置被修改的通知,然后重新導(dǎo)出服務(wù)。這個使用場景只是猜測,我并未進行過驗證。如果大家有興趣可以自行驗證。
override 數(shù)據(jù)訂閱相關(guān)代碼也不是很少,考慮到文章篇幅問題以及重要性,遂決定不對此邏輯進行詳細(xì)的分析。如果大家有興趣,可自行分析。
?3.總結(jié)
本篇文章詳細(xì)分析了 Dubbo 服務(wù)導(dǎo)出過程,包括配置檢測,URL 組裝,Invoker 創(chuàng)建過程、導(dǎo)出服務(wù)以及注冊服務(wù)等等。篇幅比較大,需要大家耐心閱讀。對于這篇文章,我建議大家當(dāng)成一個工具書使用。需要的時候跳到指定章節(jié)看一下,通讀可能會有點累。由于文章篇幅比較大,因此可能會隱藏一些我沒意識到的錯誤。若大家在閱讀的過程中發(fā)現(xiàn)了錯誤,還請指出。如果能夠不吝賜教,那就更好了,先在這里說聲謝謝。
好了,本篇文章就到這了。謝謝閱讀。
- 本文鏈接:?https://www.tianxiaobo.com/2018/10/31/Dubbo-源碼分析-服務(wù)導(dǎo)出/
http://www.tianxiaobo.com/2018/10/31/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-%E6%9C%8D%E5%8A%A1%E5%AF%BC%E5%87%BA/?
總結(jié)
以上是生活随笔為你收集整理的Dubbo 源码分析 - 服务导出的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Dubbo 源码分析 - 自适应拓展原理
- 下一篇: Dubbo 源码分析 - 服务引用