日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

dubbo源码分析系列——dubbo-cluster模块源码分析

發布時間:2025/1/21 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 dubbo源码分析系列——dubbo-cluster模块源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

模塊功能介紹

該模塊的使用介紹請參考dubbo官方用戶手冊如下章節內容。

  • 集群容錯
  • 負載均衡
  • 路由規則
  • 配置規則
  • 注冊中心參考手冊

其中注冊中心其實是對于目錄服務的一種實現方式,本文不會對注冊中心進行詳細講解。

?

核心類圖

核心源碼分析

核心接口概念及關系

各節點關系:

  • 這里的Invoker是Provider的一個可調用Service的抽象,Invoker封裝了Provider地址及Service接口信息。
  • Directory代表多個Invoker,可以把它看成List<Invoker>,但與List不同的是,它的值可能是動態變化的,比如注冊中心推送變更。
  • Cluster將Directory中的多個Invoker偽裝成一個Invoker,對上層透明,偽裝過程包含了容錯邏輯,調用失敗后,重試另一個。
  • Router負責從多個Invoker中按路由規則選出子集,比如讀寫分離,應用隔離等。
  • LoadBalance負責從多個Invoker中選出具體的一個用于本次調用,選的過程包含了負載均衡算法,調用失敗后,需要重選。

由于每種接口都有多種實現類,篇幅和時間有限,我們選擇其中最為典型的一種來進行源碼分析。

Cluster

擴展接口介紹

集群的源碼如下。

package com.alibaba.dubbo.rpc.cluster;import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.support.FailoverCluster;/*** Cluster. (SPI, Singleton, ThreadSafe)* * <a href="http://en.wikipedia.org/wiki/Computer_cluster">Cluster</a>* <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a>* * @author william.liangf*/ @SPI(FailoverCluster.NAME) public interface Cluster {/*** Merge the directory invokers to a virtual invoker.* * @param <T>* @param directory* @return cluster invoker* @throws RpcException*/@Adaptive<T> Invoker<T> join(Directory<T> directory) throws RpcException;}

該接口只有一個方法,就是將directory對象中的多個invoker的集合整合成一個invoker對象。該方法被ReferenceConfig類的createProxy方法調用,調用它的代碼如下。

// 對有注冊中心的Cluster 只用 AvailableClusterURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));

Cluster內置有9個擴展實現類,都實現了不同的集群容錯策略,我們只分析默認的自動故障轉移的擴展實現FailoverCluster。

FailoverCluster

源碼如下,只是構造了一個類型為FailoverClusterInvoker的invoker對象。

public class FailoverCluster implements Cluster {public final static String NAME = "failover";public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker<T>(directory);}}

我們進入看看FailoverClusterInvoker的源碼。
?

/*** 失敗轉移,當出現失敗,重試其它服務器,通常用于讀操作,但重試會帶來更長延遲。* * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a>* * @author william.liangf* @author chao.liuc*/ public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);public FailoverClusterInvoker(Directory<T> directory) {super(directory);}@SuppressWarnings({ "unchecked", "rawtypes" })public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;checkInvokers(copyinvokers, invocation);int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//重試時,進行重新選擇,避免重試時invoker列表已發生變化.//注意:如果列表發生了變化,那么invoked判斷會失效,因為invoker示例已經改變if (i > 0) {checkWheatherDestoried();copyinvokers = list(invocation);//重新檢查一下checkInvokers(copyinvokers, invocation);}Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List)invoked);try {Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("Although retry the method " + invocation.getMethodName()+ " in the service " + getInterface().getName()+ " was successful by the provider " + invoker.getUrl().getAddress()+ ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le);}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);}}

該類又繼承自抽象實現類AbstractClusterInvoker,使用該類的一些方法,因此也要結合該類的源碼一起看。

/** Copyright 1999-2011 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.support;import java.util.ArrayList; import java.util.List;import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.Version; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Directory; import com.alibaba.dubbo.rpc.cluster.LoadBalance; import com.alibaba.dubbo.rpc.support.RpcUtils;/*** AbstractClusterInvoker* * @author william.liangf* @author chao.liuc*/ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class);protected final Directory<T> directory;protected final boolean availablecheck;private volatile boolean destroyed = false;private volatile Invoker<T> stickyInvoker = null;public AbstractClusterInvoker(Directory<T> directory) {this(directory, directory.getUrl());}public AbstractClusterInvoker(Directory<T> directory, URL url) {if (directory == null)throw new IllegalArgumentException("service directory == null");this.directory = directory ;//sticky 需要檢測 avaliablecheck this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK) ;}public Class<T> getInterface() {return directory.getInterface();}public URL getUrl() {return directory.getUrl();}public boolean isAvailable() {Invoker<T> invoker = stickyInvoker;if (invoker != null) {return invoker.isAvailable();}return directory.isAvailable();}public void destroy() {directory.destroy();destroyed = true;}/*** 使用loadbalance選擇invoker.</br>* a)先lb選擇,如果在selected列表中 或者 不可用且做檢驗時,進入下一步(重選),否則直接返回</br>* b)重選驗證規則:selected > available .保證重選出的結果盡量不在select中,并且是可用的 * * @param availablecheck 如果設置true,在選擇的時候先選invoker.available == true* @param selected 已選過的invoker.注意:輸入保證不重復* */protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.size() == 0)return null;String methodName = invocation == null ? "" : invocation.getMethodName();boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;{//ignore overloaded methodif ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){stickyInvoker = null;}//ignore cucurrent problemif (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){if (availablecheck && stickyInvoker.isAvailable()){return stickyInvoker;}}}Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);if (sticky){stickyInvoker = invoker;}return invoker;}private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.size() == 0)return null;if (invokers.size() == 1)return invokers.get(0);// 如果只有兩個invoker,退化成輪循if (invokers.size() == 2 && selected != null && selected.size() > 0) {return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);}Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);//如果 selected中包含(優先判斷) 或者 不可用&&availablecheck=true 則重試.if( (selected != null && selected.contains(invoker))||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){try{Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);if(rinvoker != null){invoker = rinvoker;}else{//看下第一次選的位置,如果不是最后,選+1位置.int index = invokers.indexOf(invoker);try{//最后在避免碰撞invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;}catch (Exception e) {logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);}}}catch (Throwable t){logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);}}return invoker;} /*** 重選,先從非selected的列表中選擇,沒有在從selected列表中選擇.* @param loadbalance* @param invocation* @param invokers* @param selected* @return* @throws RpcException*/private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)throws RpcException {//預先分配一個,這個列表是一定會用到的.List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());//先從非select中選if( availablecheck ){ //選isAvailable 的非selectfor(Invoker<T> invoker : invokers){if(invoker.isAvailable()){if(selected ==null || !selected.contains(invoker)){reselectInvokers.add(invoker);}}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}else{ //選全部非selectfor(Invoker<T> invoker : invokers){if(selected ==null || !selected.contains(invoker)){reselectInvokers.add(invoker);}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}//最后從select中選可用的. {if(selected != null){for(Invoker<T> invoker : selected){if((invoker.isAvailable()) //優先選available && !reselectInvokers.contains(invoker)){reselectInvokers.add(invoker);}}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}return null;}public Result invoke(final Invocation invocation) throws RpcException {checkWheatherDestoried();LoadBalance loadbalance;List<Invoker<T>> invokers = list(invocation);if (invokers != null && invokers.size() > 0) {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));} else {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);}RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);return doInvoke(invocation, invokers, loadbalance);}protected void checkWheatherDestoried() {if(destroyed){throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion()+ " is now destroyed! Can not invoke any more.");}}@Overridepublic String toString() {return getInterface() + " -> " + getUrl().toString();}protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {if (invokers == null || invokers.size() == 0) {throw new RpcException("Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey()+ " from registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion()+ ". Please check if the providers have been started and registered.");}}protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,LoadBalance loadbalance) throws RpcException;protected List<Invoker<T>> list(Invocation invocation) throws RpcException {List<Invoker<T>> invokers = directory.list(invocation);return invokers;} }

源碼實現分析。

  • AbstractClusterInvoker的invoke方法提供了一個骨架實現。邏輯是檢查對象是否銷毀狀態,從directory獲得invoker列表,獲得loadbalance擴展實現對象,然后調用抽象方法doInvoke去執行真正的邏輯,交由具體子類實現。
  • AbstractClusterInvoker實現了一個公共的protected的方法select,該方法實現了使用loadbalance選擇合適的invoker對象。在選擇方法的實現中支持??粘滯連接?特性。作為一個公共特性,所有的子類都支持。最后再調用private方法doselect實現進一步選擇邏輯。
  • AbstractClusterInvoker的doselect方法實現了真正的選擇invoker邏輯。首先檢查可選invoker,若沒有則返回null;如果有2個可選invoker則退化為輪詢;否則繼續調用loadbalance的select方法選擇一個invoker;然后在檢查選中的invoker是否已經使用過或者不可用,如果不可用則會調用reselect重新選擇,若重新選擇成功則使用它,否則則使用invoker列表中當前index+1的invoker,如果已經是最后一個則直接使用當前的invoker。
  • AbstractClusterInvoker的reselect方法的實現邏輯是:如果availablecheck標志為true,則只將未被selected的可用狀態的invoker交給loadbalance進行選擇,否則將所有的未被selected的invoker交給loadbalance選擇,若可重新選擇的invoker為空,則將selected的invoker列表交給loadbalance進行選擇。
  • FailoverClusterInvoker的doInvoke方法的實現邏輯為:檢查invoker列表狀態;獲得參數中重試次數,默認次數是2;從directory獲得invoker列表;調用select方法選擇一個invoker;將選擇的invoker加入到invoked集合,表示已經選擇和使用的;調用invoker.invoke()方法,若成功則返回result,若拋出的是業務異常則拋出,否則繼續重試選擇并調用下一個invoker;
  • LoadBalance

    負載均衡器

    擴展接口定義

    @SPI(RandomLoadBalance.NAME) public interface LoadBalance {/*** select one invoker in list.* * @param invokers invokers.* @param url refer url* @param invocation invocation.* @return selected invoker.*/@Adaptive("loadbalance")<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

    上述源碼所示,負載均衡只定義了一個方法,就是在候選的invokers中選擇一個invoker對象出來。默認的擴展實現是random。那我么就分析RandomLoadBalance的源碼。

    RandomLoadBalance

    /** Copyright 1999-2011 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.loadbalance;import java.util.List; import java.util.Random;import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker;/*** random load balance.** @author qianlei* @author william.liangf*/ public class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";private final Random random = new Random();protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); // 總個數int totalWeight = 0; // 總權重boolean sameWeight = true; // 權重是否都一樣for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);totalWeight += weight; // 累計總權重if (sameWeight && i > 0&& weight != getWeight(invokers.get(i - 1), invocation)) {sameWeight = false; // 計算所有權重是否一樣}}if (totalWeight > 0 && ! sameWeight) {// 如果權重不相同且權重大于0則按總權重數隨機int offset = random.nextInt(totalWeight);// 并確定隨機值落在哪個片斷上for (int i = 0; i < length; i++) {offset -= getWeight(invokers.get(i), invocation);if (offset < 0) {return invokers.get(i);}}}// 如果權重相同或權重為0則均等隨機return invokers.get(random.nextInt(length));}}

    該類繼承了抽象類AbstractLoadBalance,因此我們也要結合該類一起分析。

    /** Copyright 1999-2011 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.loadbalance;import java.util.List;import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.cluster.LoadBalance;/*** AbstractLoadBalance* * @author william.liangf*/ public abstract class AbstractLoadBalance implements LoadBalance {public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (invokers == null || invokers.size() == 0)return null;if (invokers.size() == 1)return invokers.get(0);return doSelect(invokers, url, invocation);}protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);protected int getWeight(Invoker<?> invoker, Invocation invocation) {int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);if (weight > 0) {long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);if (timestamp > 0L) {int uptime = (int) (System.currentTimeMillis() - timestamp);int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);if (uptime > 0 && uptime < warmup) {weight = calculateWarmupWeight(uptime, warmup, weight);}}}return weight;}static int calculateWarmupWeight(int uptime, int warmup, int weight) {int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) );return ww < 1 ? 1 : (ww > weight ? weight : ww);}}

    源碼分析如下:

  • AbstractLoadBalance的select方法實現,只是做了參數校驗,invoker列表若0個則返回null,1個元素則直接返回;否則調用抽象方法doSelect交給子類實現。
  • AbstractLoadBalance定義了公共方法getWeight。該方法是獲取invoker的權重的方法,公式是:(int) ( (float) uptime / ( (float) warmup / (float) weight ) );
  • 如果未設置權重或者權重值都一樣,則直接調用random.nextInt()隨機獲得一個invoker;若設置了權重并且不一樣,則在總權重中隨機,分布在哪個invoker的分片上,則選擇該invoker對象,實現了按照權重隨機。
  • Router

    接口定義

    public interface Router extends Comparable<Router> {/*** get the router url.* * @return url*/URL getUrl();/*** route.* * @param invokers* @param url refer url* @param invocation* @return routed invokers* @throws RpcException*/<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

    路由器就定義了上述2個方法,核心方法是route,從大的invoker列表結合中根據規則過濾出一個子集合。我們這里只分析實現類ConditionRouter的源碼。

    ConditionRouter

    /** Copyright 1999-2012 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.router.condition;import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern;import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.common.utils.UrlUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Router;/*** ConditionRouter* * @author william.liangf*/ public class ConditionRouter implements Router, Comparable<Router> {private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class);private final URL url;private final int priority;private final boolean force;private final Map<String, MatchPair> whenCondition;private final Map<String, MatchPair> thenCondition;public ConditionRouter(URL url) {this.url = url;this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);this.force = url.getParameter(Constants.FORCE_KEY, false);try {String rule = url.getParameterAndDecoded(Constants.RULE_KEY);if (rule == null || rule.trim().length() == 0) {throw new IllegalArgumentException("Illegal route rule!");}rule = rule.replace("consumer.", "").replace("provider.", "");int i = rule.indexOf("=>");String whenRule = i < 0 ? null : rule.substring(0, i).trim();String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);// NOTE: When條件是允許為空的,外部業務來保證類似的約束條件this.whenCondition = when;this.thenCondition = then;} catch (ParseException e) {throw new IllegalStateException(e.getMessage(), e);}}public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)throws RpcException {if (invokers == null || invokers.size() == 0) {return invokers;}try {if (! matchWhen(url)) {return invokers;}List<Invoker<T>> result = new ArrayList<Invoker<T>>();if (thenCondition == null) {logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());return result;}for (Invoker<T> invoker : invokers) {if (matchThen(invoker.getUrl(), url)) {result.add(invoker);}}if (result.size() > 0) {return result;} else if (force) {logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));return result;}} catch (Throwable t) {logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);}return invokers;}public URL getUrl() {return url;}public int compareTo(Router o) {if (o == null || o.getClass() != ConditionRouter.class) {return 1;}ConditionRouter c = (ConditionRouter) o;return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1);}public boolean matchWhen(URL url) {return matchCondition(whenCondition, url, null);}public boolean matchThen(URL url, URL param) {return thenCondition != null && matchCondition(thenCondition, url, param);}private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param) {Map<String, String> sample = url.toMap();for (Map.Entry<String, String> entry : sample.entrySet()) {String key = entry.getKey();MatchPair pair = condition.get(key);if (pair != null && ! pair.isMatch(entry.getValue(), param)) {return false;}}return true;}private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");private static Map<String, MatchPair> parseRule(String rule)throws ParseException {Map<String, MatchPair> condition = new HashMap<String, MatchPair>();if(StringUtils.isBlank(rule)) {return condition;} // 匹配或不匹配Key-Value對MatchPair pair = null;// 多個Value值Set<String> values = null;final Matcher matcher = ROUTE_PATTERN.matcher(rule);while (matcher.find()) { // 逐個匹配String separator = matcher.group(1);String content = matcher.group(2);// 表達式開始if (separator == null || separator.length() == 0) {pair = new MatchPair();condition.put(content, pair);}// KV開始else if ("&".equals(separator)) {if (condition.get(content) == null) {pair = new MatchPair();condition.put(content, pair);} else {condition.put(content, pair);}}// KV的Value部分開始else if ("=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values = pair.matches;values.add(content);}// KV的Value部分開始else if ("!=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values = pair.mismatches;values.add(content);}// KV的Value部分的多個條目else if (",".equals(separator)) { // 如果為逗號表示if (values == null || values.size() == 0)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values.add(content);} else {throw new ParseException("Illegal route rule \"" + rule+ "\", The error char '" + separator + "' at index "+ matcher.start() + " before \"" + content + "\".", matcher.start());}}return condition;}private static final class MatchPair {final Set<String> matches = new HashSet<String>();final Set<String> mismatches = new HashSet<String>();public boolean isMatch(String value, URL param) {for (String match : matches) {if (! UrlUtils.isMatchGlobPattern(match, value, param)) {return false;}}for (String mismatch : mismatches) {if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {return false;}}return true;}} }

    該源碼實現了如下條件路由器功能。

    基于條件表達式的路由規則,如:

    host = 10.20.153.10 => host = 10.20.153.11

    規則:

    • "=>"之前的為消費者匹配條件,所有參數和消費者的URL進行對比,當消費者滿足匹配條件時,對該消費者執行后面的過濾規則。
    • "=>"之后為提供者地址列表的過濾條件,所有參數和提供者的URL進行對比,消費者最終只拿到過濾后的地址列表。
    • 如果匹配條件為空,表示對所有消費方應用,如:=> host != 10.20.153.11
    • 如果過濾條件為空,表示禁止訪問,如:host = 10.20.153.10 =>

    表達式:

    • 參數支持:
      • 服務調用信息,如:method,?argument?等?(暫不支持參數路由)
      • URL本身的字段,如:protocol, host, port 等
      • 以及URL上的所有參數,如:application, organization 等
    • 條件支持:
      • 等號"="表示"匹配",如:host = 10.20.153.10
      • 不等號"!="表示"不匹配",如:host != 10.20.153.10
    • 值支持:
      • 以逗號","分隔多個值,如:host != 10.20.153.10,10.20.153.11
      • 以星號"*"結尾,表示通配,如:host != 10.20.*
      • 以美元符"$"開頭,表示引用消費者參數,如:host = $host

    Directory

    接口定義

    public interface Directory<T> extends Node {/*** get service type.* * @return service type.*/Class<T> getInterface();/*** list invokers.* * @return invokers*/List<Invoker<T>> list(Invocation invocation) throws RpcException;}

    目錄服務定義了一個核心接口list,就是列舉出某個接口在目錄中的所有服務列表。

    抽象實現AbstractDirectory

    提供了一個抽象的目錄實現類,源碼如下。

    /*** 增加router的Directory* * @author chao.liuc*/ public abstract class AbstractDirectory<T> implements Directory<T> {// 日志輸出private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);private final URL url ;private volatile boolean destroyed = false;private volatile URL consumerUrl ;private volatile List<Router> routers;public AbstractDirectory(URL url) {this(url, null);}public AbstractDirectory(URL url, List<Router> routers) {this(url, url, routers);}public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {if (url == null)throw new IllegalArgumentException("url == null");this.url = url;this.consumerUrl = consumerUrl;setRouters(routers);}public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed){throw new RpcException("Directory already destroyed .url: "+ getUrl());}List<Invoker<T>> invokers = doList(invocation);List<Router> localRouters = this.routers; // local referenceif (localRouters != null && localRouters.size() > 0) {for (Router router: localRouters){try {if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {invokers = router.route(invokers, getConsumerUrl(), invocation);}} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}}return invokers;}public URL getUrl() {return url;}public List<Router> getRouters(){return routers;}public URL getConsumerUrl() {return consumerUrl;}public void setConsumerUrl(URL consumerUrl) {this.consumerUrl = consumerUrl;}protected void setRouters(List<Router> routers){// copy listrouters = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);// append url routerString routerkey = url.getParameter(Constants.ROUTER_KEY);if (routerkey != null && routerkey.length() > 0) {RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);routers.add(routerFactory.getRouter(url));}// append mock invoker selectorrouters.add(new MockInvokersSelector());Collections.sort(routers);this.routers = routers;}public boolean isDestroyed() {return destroyed;}public void destroy(){destroyed = true;}protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException ;}

    list方法的實現邏輯是:先檢查目錄是否銷毀狀態,若已經銷毀則拋出異常;調用抽象方法doList實現真正的從目錄服務中獲取invoker列表,該方法需要子類實現;循環對象中的路由器列表,若路由器url為null或者參數runtime為true則調用該路由器的route方法進行路由,將返回的invoker列表替換為路由后的結果; 返回最終的invoker列表。

    setRouters方法是設置路由器列表,除了參數參入的routers之外,還會追加2個默認的路由器,一個是參數router指定的routerFactory獲得的router,另外一個是MockInvokersSelector對象;

    默認實現StaticDirectory

    模塊還提供了一個默認目錄實現類StaticDirectory,它是一個靜態的內存緩存目錄服務實現。源碼如下:

    public class StaticDirectory<T> extends AbstractDirectory<T> {private final List<Invoker<T>> invokers;public StaticDirectory(List<Invoker<T>> invokers){this(null, invokers, null);}public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers){this(null, invokers, routers);}public StaticDirectory(URL url, List<Invoker<T>> invokers) {this(url, invokers, null);}public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers);if (invokers == null || invokers.size() == 0)throw new IllegalArgumentException("invokers == null");this.invokers = invokers;}public Class<T> getInterface() {return invokers.get(0).getInterface();}public boolean isAvailable() {if (isDestroyed()) {return false;}for (Invoker<T> invoker : invokers) {if (invoker.isAvailable()) {return true;}}return false;}public void destroy() {if(isDestroyed()) {return;}super.destroy();for (Invoker<T> invoker : invokers) {invoker.destroy();}invokers.clear();}@Overrideprotected List<Invoker<T>> doList(Invocation invocation) throws RpcException {return invokers;}}

    它的doList方法的實現是直接將屬性invokers的值返回,非常簡單。

    此外還有一個RegistryDirectory的實現類,該類是整合了注冊中心和目錄服務。

    NEXT

    因為考慮到本模塊與dubbo-registry相關性較大,接下來我們將研究dubbo-registry-api和dubbo-registry-default模塊的源碼。

    ?

    轉載于:https://my.oschina.net/ywbrj042/blog/689818

    總結

    以上是生活随笔為你收集整理的dubbo源码分析系列——dubbo-cluster模块源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 一级肉体全黄毛片 | 丰满少妇被猛烈进入一区二区 | 欧美日韩在线一区 | 国产在线一区二区 | 黑人玩弄人妻一区二 | 国产精品黄色片 | 超碰2| 成人免费三级 | 五月婷婷丁香在线 | 极品美女被c | 欧美性爱精品在线 | 欧美激情区 | 麻豆传谋在线观看免费mv | 国产在线二区 | 天天插综合网 | 亚洲欧美日韩国产精品 | 制服丝袜av电影 | 美女隐私免费看 | 女人扒开腿让男人捅爽 | 伊人逼逼 | 999黄色片 | 国产精品国产三级国产aⅴ原创 | 农村黄色片 | 成人激情在线观看 | 中文字幕一区二区三区夫目前犯 | 国产免费片 | 天堂av资源网| 久久精品国产亚洲AV高清综合 | 日韩精品中文字幕在线观看 | 日日草夜夜操 | 亚洲狼人综合网 | 欧美激情黑人 | 免费在线国产视频 | 精品日韩在线 | 99九九精品视频 | 都市激情亚洲一区 | fc2ppv在线播放 | 日韩激情网 | 国产伦理一区 | 久久精品超碰 | 东北少妇露脸无套对白 | 黄色国产视频网站 | 国产污视频网站 | 国产精品看片 | 日批视频免费在线观看 | 在线免费看黄网站 | 韩国三级在线看 | 老熟妇高潮一区二区高清视频 | 欧美zzz物交 | 亚洲五码在线 | av国语| 亚洲一区二区在线免费观看 | 伊人草| 国内免费av | xx69欧美| 欧美又大粗又爽又黄大片视频 | www.亚洲一区二区 | 亚洲午夜精品一区二区三区 | 国产网红在线 | 性工作者十日谈 | 欧美男女动态图 | 天美乌鸦星空mv | 岛国大片在线 | 91插插插插插插插插 | 午夜av免费看 | 神宫寺奈绪一区二区三区 | 精品乱码一区二区三四区视频 | 成人91网站 | 欧美三级黄色 | 青青青青操 | 91亚洲一线产区二线产区 | 亚洲精品第五页 | a级国产视频 | 成人小网站 | 成年女人18级毛片毛片免费 | jzzjzzjzz亚洲成熟少妇 | 一区二区三区不卡在线观看 | 人妻少妇精品一区二区 | 很黄很污的视频网站 | 激情久久五月 | av全黄 | 老女人人体欣赏a√s | 天天夜碰日日摸日日澡性色av | 成人 黄 色 免费播放 | a黄色片 | 国产精品探花一区二区在线观看 | 三级网站在线播放 | 国产69页 | 天天色天天 | 亚洲香蕉av | 风流老熟女一区二区三区 | 99re这里只有精品在线观看 | 久久国产精品无码一区二区 | 日本男男激情gay办公室 | av色婷婷 | 午夜三级在线观看 | 亚洲成a人片 | 婷婷激情视频 | 黄色的网站在线观看 |