日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

dubbo源码解析(九)远程通信——Transport层

發(fā)布時間:2025/3/18 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 dubbo源码解析(九)远程通信——Transport层 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

遠程通訊——Transport層

目標(biāo):介紹Transport層的相關(guān)設(shè)計和邏輯、介紹dubbo-remoting-api中的transport包內(nèi)的源碼解析。

前言

先預(yù)警一下,該文篇幅會很長,做好心理準(zhǔn)備。Transport層也就是網(wǎng)絡(luò)傳輸層,在遠程通信中必然會涉及到傳輸。它在dubbo 的框架設(shè)計中也處于倒數(shù)第二層,當(dāng)然最底層是序列化,這個后面介紹。官方文檔對Transport層的解釋是抽象 mina 和 netty 為統(tǒng)一接口,以 Message 為中心,擴展接口為 Channel、Transporter、Client、Server、Codec。那我們現(xiàn)在先來看這個包下面的類圖:

可以看到有四個包繼承了AbstractChannel、AbstractServer、AbstractClient。也就是說現(xiàn)在Transport層是抽象mina、netty以及grizzly為統(tǒng)一接口??赐觐悎D,再來看看包結(jié)構(gòu):

下面的講解大致會按照類圖中類的順序往下講,盡量把client、server、channel、codec、dispacher五部分涉及到的內(nèi)容一起講解。

源碼解析

(一)AbstractPeer

public abstract class AbstractPeer implements Endpoint, ChannelHandler {private final ChannelHandler handler;private volatile URL url;/*** 是否正在關(guān)閉*/// closing closed means the process is being closed and close is finishedprivate volatile boolean closing;/*** 是否關(guān)閉完成*/private volatile boolean closed;public AbstractPeer(URL url, ChannelHandler handler) {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}this.url = url;this.handler = handler;} }

該類實現(xiàn)了Endpoint和ChannelHandler兩個接口,要關(guān)注的兩個點:

  • 實現(xiàn)ChannelHandler接口并且有在屬性中還有一個handler,下面很多實現(xiàn)方法也是直接調(diào)用了handler方法,這種模式叫做裝飾模式,這樣做可以對裝飾對象靈活的增強功能。對裝飾模式不懂的朋友可以google一下。有很多例子介紹。
  • 在該類中有closing和closed屬性,在Endpoint中有很多關(guān)于關(guān)閉通道的操作,會有關(guān)閉中和關(guān)閉完成的狀態(tài)區(qū)分,在該類中就緩存了這兩個屬性來判斷關(guān)閉的狀態(tài)。
  • 下面我就介紹該類中的send方法,其他方法比較好理解,到時候可以直接看源碼:

    @Override public void send(Object message) throws RemotingException {// url中sent的配置項send(message, url.getParameter(Constants.SENT_KEY, false)); }

    該配置項是選擇是否等待消息發(fā)出:

  • sent值為true,等待消息發(fā)出,消息發(fā)送失敗將拋出異常。
  • sent值為false,不等待消息發(fā)出,將消息放入 IO 隊列,即刻返回。
  • 對該類還有點糊涂的朋友,記住在ChannelHandler接口,該類就做了裝飾模式中裝飾角色,在Endpoint接口,只是維護了通道的正在關(guān)閉和關(guān)閉完成兩個狀態(tài)。

    (二)AbstractEndpoint

    public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {/*** 日志記錄*/private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);/*** 編解碼器*/private Codec2 codec;/*** 超時時間*/private int timeout;/*** 連接超時時間*/private int connectTimeout;public AbstractEndpoint(URL url, ChannelHandler handler) {super(url, handler);this.codec = getChannelCodec(url);// 優(yōu)先從url配置中取,如果沒有,默認為1sthis.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// 優(yōu)先從url配置中取,如果沒有,默認為3sthis.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);}/*** 從url中獲得編解碼器的配置,并且返回該實例* @param url* @return*/protected static Codec2 getChannelCodec(URL url) {String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");// 優(yōu)先從Codec2的擴展類中找if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);} else {return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));}}}

    該類是端點的抽象類,其中封裝了編解碼器以及兩個超時時間?;赿ubbo 的SPI機制,獲得相應(yīng)的編解碼器實現(xiàn)對象,編解碼器優(yōu)先從Codec2的擴展類中尋找。

    下面來看看該類中的reset方法:

    @Override public void reset(URL url) {if (isClosed()) {throw new IllegalStateException("Failed to reset parameters "+ url + ", cause: Channel closed. channel: " + getLocalAddress());}try {// 判斷重置的url中有沒有攜帶timeout,有的話重置if (url.hasParameter(Constants.TIMEOUT_KEY)) {int t = url.getParameter(Constants.TIMEOUT_KEY, 0);if (t > 0) {this.timeout = t;}}} catch (Throwable t) {logger.error(t.getMessage(), t);}try {// 判斷重置的url中有沒有攜帶connect.timeout,有的話重置if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) {int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0);if (t > 0) {this.connectTimeout = t;}}} catch (Throwable t) {logger.error(t.getMessage(), t);}try {// 判斷重置的url中有沒有攜帶codec,有的話重置if (url.hasParameter(Constants.CODEC_KEY)) {this.codec = getChannelCodec(url);}} catch (Throwable t) {logger.error(t.getMessage(), t);} }@Deprecated public void reset(com.alibaba.dubbo.common.Parameters parameters) {reset(getUrl().addParameters(parameters.getParameters())); }

    這個方法是Resetable接口中的方法,可以看到以前的reset實現(xiàn)方法都加上了@Deprecated注解,不推薦使用了,因為這種實現(xiàn)方式重置太復(fù)雜,需要把所有參數(shù)都設(shè)置一遍,比如我只想重置一個超時時間,但是其他值不變,如果用以前的reset,我需要在url中把所有值都帶上,就會很多余?,F(xiàn)在用新的reset,每次只關(guān)心我需要重置的值,只更改為需要重置的值。比如上面的代碼所示,只想修改超時時間,那我就只在url中攜帶超時時間的參數(shù)。

    (三)AbstractServer

    該類繼承了AbstractEndpoint并且實現(xiàn)Server接口,是服務(wù)器抽象類。重點實現(xiàn)了服務(wù)器的公共邏輯,比如發(fā)送消息,關(guān)閉通道,連接通道,斷開連接等。并且抽象了打開和關(guān)閉服務(wù)器兩個方法。

    1.屬性

    /*** 服務(wù)器線程名稱*/ protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class); /*** 線程池*/ ExecutorService executor; /*** 服務(wù)地址,也就是本地地址*/ private InetSocketAddress localAddress; /*** 綁定地址*/ private InetSocketAddress bindAddress; /*** 最大可接受的連接數(shù)*/ private int accepts; /*** 空閑超時時間,單位是s*/ private int idleTimeout = 600; //600 seconds

    該類的屬性比較好理解,就是稍微注意一下idleTimeout的單位是s。

    2.構(gòu)造函數(shù)

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// 從url中獲得本地地址localAddress = getUrl().toInetSocketAddress();// 從url配置中獲得綁定的ipString bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());// 從url配置中獲得綁定的端口號int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());// 判斷url中配置anyhost是否為true或者判斷host是否為不可用的本地Hostif (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = NetUtils.ANYHOST;}bindAddress = new InetSocketAddress(bindIp, bindPort);// 從url中獲取配置,默認值為0this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);// 從url中獲取配置,默認600sthis.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {// 開啟服務(wù)器doOpen();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);}// 獲得線程池//fixme replace this with better methodDataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }

    構(gòu)造函數(shù)大部分邏輯就是從url中取配置,存到緩存中,并且做了開啟服務(wù)器的操作。具體的看上面的注釋,還是比較清晰的。

    3.reset方法

    @Override public void reset(URL url) {if (url == null) {return;}try {// 重置accepts的值if (url.hasParameter(Constants.ACCEPTS_KEY)) {int a = url.getParameter(Constants.ACCEPTS_KEY, 0);if (a > 0) {this.accepts = a;}}} catch (Throwable t) {logger.error(t.getMessage(), t);}try {// 重置idle.timeout的值if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);if (t > 0) {this.idleTimeout = t;}}} catch (Throwable t) {logger.error(t.getMessage(), t);}try {// 重置線程數(shù)配置if (url.hasParameter(Constants.THREADS_KEY)&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;// 獲得url配置中的線程數(shù)int threads = url.getParameter(Constants.THREADS_KEY, 0);// 獲得線程池允許的最大線程數(shù)int max = threadPoolExecutor.getMaximumPoolSize();// 返回核心線程數(shù)int core = threadPoolExecutor.getCorePoolSize();// 設(shè)置最大線程數(shù)和核心線程數(shù)if (threads > 0 && (threads != max || threads != core)) {if (threads < core) {// 如果設(shè)置的線程數(shù)比核心線程數(shù)少,則直接設(shè)置核心線程數(shù)threadPoolExecutor.setCorePoolSize(threads);if (core == max) {// 當(dāng)核心線程數(shù)和最大線程數(shù)相等的時候,把最大線程數(shù)也重置threadPoolExecutor.setMaximumPoolSize(threads);}} else {// 當(dāng)大于核心線程數(shù)時,直接設(shè)置最大線程數(shù)threadPoolExecutor.setMaximumPoolSize(threads);// 只有當(dāng)核心線程數(shù)和最大線程數(shù)相等的時候才設(shè)置核心線程數(shù)if (core == max) {threadPoolExecutor.setCorePoolSize(threads);}}}}} catch (Throwable t) {logger.error(t.getMessage(), t);}// 重置urlsuper.setUrl(getUrl().addParameters(url.getParameters())); }

    該類中的reset方法做了三個值的重置,分別是最大可連接的客戶端數(shù)量、空閑超時時間以及線程池的兩個配置參數(shù)。其中要注意核心線程數(shù)和最大線程數(shù)的區(qū)別。舉個例子,核心線程數(shù)就像是工廠正式工,最大線程數(shù),就是工廠臨時工作量加大,請了一批臨時工,臨時工加正式工的和就是最大線程數(shù),等這批任務(wù)結(jié)束后,臨時工要辭退的,而正式工會留下。

    還有send、close、connected、disconnected等方法比較簡單,如果有興趣,可以到我的GitHub查看,地址文章末尾會給出。

    (四)AbstractClient

    該類是客戶端的抽象類,繼承了AbstractEndpoint類,實現(xiàn)了Client接口,該類中也是做了客戶端公用的重連邏輯,抽象了打開客戶端、關(guān)閉客戶端、連接服務(wù)器、斷開服務(wù)器連接以及獲得通道方法,讓子類去重點關(guān)注這幾個方法。

    1.屬性

    /*** 客戶端線程名稱*/ protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); /*** 線程池id*/ private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger(); /*** 重連定時任務(wù)執(zhí)行器*/ private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true)); /*** 連接鎖*/ private final Lock connectLock = new ReentrantLock(); /*** 發(fā)送消息時,若斷開,是否重連*/ private final boolean send_reconnect; /*** 重連次數(shù)*/ private final AtomicInteger reconnect_count = new AtomicInteger(0); /*** 在這之前是否調(diào)用重新連接的錯誤日志*/ // Reconnection error log has been called before? private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false); /*** 重連 warning 的間隔.(waring多少次之后,warning一次),也就是錯誤多少次后告警一次錯誤*/ // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test private final int reconnect_warning_period; /*** 關(guān)閉超時時間*/ private final long shutdown_timeout; /*** 線程池*/ protected volatile ExecutorService executor; /*** 重連執(zhí)行任務(wù)*/ private volatile ScheduledFuture<?> reconnectExecutorFuture = null; // the last successed connected time /*** 最后成功連接的時間*/ private long lastConnectedTime = System.currentTimeMillis();

    上述屬性大部分跟重連有關(guān),該類最重要的也是封裝了重連的邏輯。

    2.構(gòu)造函數(shù)

    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// 從url中獲得是否重連的配置,默認為falsesend_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);// 從url中獲得關(guān)閉超時時間,默認為900sshutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.// 重連的默認值是2s,重連 warning 的間隔默認是1800,當(dāng)出錯的時候,每隔1800*2=3600s報警一次reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);try {// 打開客戶端doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect.// 連接服務(wù)器connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}// 從緩存中獲得線程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));// 清楚線程池緩存ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); }

    該構(gòu)造函數(shù)中做了一些屬性值的設(shè)置,并且做了打開客戶端和連接服務(wù)器的操作。

    3.wrapChannelHandler

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {// 加入線程名稱url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);// 設(shè)置使用的線程池類型url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);// 包裝return ChannelHandlers.wrap(handler, url); }

    該方法是包裝通道處理器,設(shè)置使用的線程池類型是可緩存線程池。

    4.initConnectStatusCheckCommand

    private synchronized void initConnectStatusCheckCommand() {//reconnect=false to close reconnectint reconnect = getReconnectParam(getUrl());// 有連接頻率的值,并且當(dāng)前沒有連接任務(wù)if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {Runnable connectStatusCheckCommand = new Runnable() {@Overridepublic void run() {try {if (!isConnected()) {// 重連connect();} else {// 記錄最后一次重連的時間lastConnectedTime = System.currentTimeMillis();}} catch (Throwable t) {String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();// wait registry sync provider listif (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {// 如果之前沒有打印過重連的誤日志if (!reconnect_error_log_flag.get()) {reconnect_error_log_flag.set(true);// 打印日志logger.error(errorMsg, t);return;}}// 如果到達一次重連日志告警周期,則打印告警日志if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {logger.warn(errorMsg, t);}}}};// 開啟重連定時任務(wù)reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);} }

    該方法是初始化重連線程,其中做了重連失敗后的告警日志和錯誤日志打印策略。

    5.reconnect

    @Override public void reconnect() throws RemotingException {disconnect();connect(); }

    單獨放該方法是因為這是該類關(guān)注的重點。實現(xiàn)了客戶端的重連邏輯。

    6.其他

    connect、disconnect、close等方法都是調(diào)用了對應(yīng)的抽象方法,而具體的邏輯需要看具體的子類如何去實現(xiàn)相關(guān)的抽象方法,這幾個方法邏輯比較簡單,我不在這里貼出源碼,有興趣可以看我的GitHub,地址文章末尾會給出。

    (四)AbstractChannel

    該類是通道的抽象類,該類里面做的邏輯很簡單,具體的發(fā)送消息邏輯在它 的子類中實現(xiàn)。

    @Override public void send(Object message, boolean sent) throws RemotingException {// 檢測通道是否關(guān)閉if (isClosed()) {throw new RemotingException(this, "Failed to send message "+ (message == null ? "" : message.getClass().getName()) + ":" + message+ ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());} }

    可以看到send方法,其中只做了檢測通道是否關(guān)閉的狀態(tài)檢測,沒有實現(xiàn)具體的發(fā)送消息的邏輯。

    (五)ChannelHandlerDelegate

    該類繼承了ChannelHandler,從它的名字可以看出是ChannelHandler的代表,它就是作為裝飾模式中的Component角色,后面講到的AbstractChannelHandlerDelegate作為裝飾模式中的Decorator角色。

    public interface ChannelHandlerDelegate extends ChannelHandler {/*** 獲得通道* @return*/ChannelHandler getHandler(); }

    (六)AbstractChannelHandlerDelegate

    屬性:

    protected ChannelHandler handler

    該類實現(xiàn)了ChannelHandlerDelegate接口,并且有一個屬性是ChannelHandler,上述已經(jīng)說到這是裝飾模式中的裝飾角色,其中的所有實現(xiàn)方法都直接調(diào)用被裝飾的handler屬性的方法。

    (七)DecodeHandler

    該類為解碼處理器,繼承了AbstractChannelHandlerDelegate,對接收到的消息進行解碼,在父類處理接收消息的功能上疊加了解碼功能。

    我們來看看received方法:

    @Override public void received(Channel channel, Object message) throws RemotingException {// 如果是Decodeable類型的消息,則對整個消息解碼if (message instanceof Decodeable) {decode(message);}// 如果是Request請求類型消息,則對請求中對請求數(shù)據(jù)解碼if (message instanceof Request) {decode(((Request) message).getData());}// 如果是Response返回類型的消息,則對返回消息中對結(jié)果進行解碼if (message instanceof Response) {decode(((Response) message).getResult());}// 繼續(xù)將消息委托給handler,繼續(xù)處理handler.received(channel, message); }

    可以看到做了三次判斷,根據(jù)消息的不同會對消息的不同數(shù)據(jù)做解碼??梢钥吹?#xff0c;這里用到裝飾模式后,在處理消息的前面做了解碼的處理,并且還能繼續(xù)委托給handler來處理消息,通過組合做到了功能的疊加。

    private void decode(Object message) {// 如果消息類型是Decodeable,進一步調(diào)用Decodeable的decode來解碼if (message != null && message instanceof Decodeable) {try {((Decodeable) message).decode();if (log.isDebugEnabled()) {log.debug("Decode decodeable message " + message.getClass().getName());}} catch (Throwable e) {if (log.isWarnEnabled()) {log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);}} // ~ end of catch} // ~ end of if } // ~ end of method decode

    可以看到這是解析消息的邏輯,當(dāng)消息是Decodeable類型,還會繼續(xù)調(diào)用Decodeable的decode方法來進行解析。它的實現(xiàn)類后續(xù)會講解到。

    (八)MultiMessageHandler

    該類是多消息處理器的抽象類。同樣繼承了AbstractChannelHandlerDelegate類,我們來看看它的received方法:

    @SuppressWarnings("unchecked") @Override public void received(Channel channel, Object message) throws RemotingException {// 當(dāng)消息為多消息時 循環(huán)交給handler處理接收到當(dāng)消息if (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;for (Object obj : list) {handler.received(channel, obj);}} else {// 如果是單消息,就直接交給handler處理器handler.received(channel, message);} }

    邏輯很簡單,當(dāng)消息是多消息類型時,也就是一次性接收到多條消息的情況,循環(huán)去處理消息,當(dāng)消息是單消息時候,直接交給handler去處理。

    (九)WrappedChannelHandler

    該類跟AbstractChannelHandlerDelegate的作用類似,都是裝飾模式中的裝飾角色,其中的所有實現(xiàn)方法都直接調(diào)用被裝飾的handler屬性的方法,該類是為了添加線程池的功能,它的子類都是去關(guān)心哪些消息是需要分發(fā)到線程池的,哪些消息直接由I / O線程執(zhí)行,現(xiàn)在版本有四種場景,也就是它的四個子類,下面我一一描述。

    public WrappedChannelHandler(ChannelHandler handler, URL url) {this.handler = handler;this.url = url;// 創(chuàng)建線程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);// 設(shè)置組件的keyString componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {componentKey = Constants.CONSUMER_SIDE;}// 獲得dataStore實例DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();// 把線程池放到dataStore中緩存dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }

    可以看到構(gòu)造方法除了屬性的填充以外,線程池是基于dubbo 的SPI Adaptive機制創(chuàng)建的,在dataStore中把線程池加進去, 該線程池就是AbstractClient 或 AbstractServer 從 DataStore 獲得的線程池。

    public ExecutorService getExecutorService() {// 首先返回的不是共享線程池,是該類的線程池ExecutorService cexecutor = executor;// 如果該類的線程池關(guān)閉或者為空,則返回的是共享線程池if (cexecutor == null || cexecutor.isShutdown()) {cexecutor = SHARED_EXECUTOR;}return cexecutor; }

    該方法是獲得線程池的實例,不過該類里面有兩個線程池,還加入了一個共享線程池,共享線程池優(yōu)先級較低。

    (十)ExecutionChannelHandler

    該類繼承了WrappedChannelHandler,也是增強了功能,處理的是接收請求消息時,把請求消息分發(fā)到線程池,而除了請求消息以外,其他消息類型都直接通過I / O線程直接執(zhí)行。

    @Override public void received(Channel channel, Object message) throws RemotingException {// 獲得線程池實例ExecutorService cexecutor = getExecutorService();// 如果消息是request類型,才會分發(fā)到線程池,其他消息,如響應(yīng),連接,斷開連接,心跳將由I / O線程直接執(zhí)行。if (message instanceof Request) {try {// 把請求消息分發(fā)到線程池cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {// FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,// therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent// this scenario from happening, but a better solution should be considered later.// 當(dāng)線程池滿了,SERVER_THREADPOOL_EXHAUSTED_ERROR錯誤無法正常返回// 因此消費者方必須等到超時。這是一種預(yù)防的臨時解決方案,所以這里直接返回該錯誤if (t instanceof RejectedExecutionException) {Request request = (Request) message;if (request.isTwoWay()) {String msg = "Server side(" + url.getIp() + "," + url.getPort()+ ") thread pool is exhausted, detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);}} else {// 如果消息不是request類型,則直接處理handler.received(channel, message);} }

    上述就可以都看到對于請求消息的處理,其中有個打補丁的方式是當(dāng)線程池滿了的時候,消費者只能等待請求超時,所以這里直接返回線程池滿的錯誤。

    (十一)AllChannelHandler

    該類也繼承了WrappedChannelHandler,也是為了增強功能,處理的是連接、斷開連接、捕獲異常以及接收到的所有消息都分發(fā)到線程池。

    @Override public void connected(Channel channel) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {// 把連接操作分發(fā)到線程池處理cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);} }@Override public void disconnected(Channel channel) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {// 把斷開連接操作分發(fā)到線程池處理cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);} }@Override public void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {// 把所有消息分發(fā)到線程池處理cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out// 這里處理線程池滿的問題,只有在請求時候會出現(xiàn)。//復(fù)線程池已滿,拒絕調(diào)用,不返回,并導(dǎo)致使用者等待超時if(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);channel.send(response);return;}}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);} }@Override public void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {// 把捕獲異常作分發(fā)到線程池處理cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);} }

    可以看到,所有操作以及消息都分到到線程池中。并且注意操作不同,傳入的狀態(tài)也不同。

    (十二)ConnectionOrderedChannelHandler

    該類也是繼承了WrappedChannelHandler,增強功能,該類是把連接、取消連接以及接收到的消息都分發(fā)到線程池,但是不同的是,該類自己創(chuàng)建了一個跟連接相關(guān)的線程池,把連接操作和斷開連接操分發(fā)到該線程池,而接收到的消息則分發(fā)到WrappedChannelHandler的線程池中。來看看具體的實現(xiàn)。

    /*** 連接線程池*/ protected final ThreadPoolExecutor connectionExecutor; /*** 連接隊列大小限制*/ private final int queuewarninglimit;public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {super(handler, url);// 獲得線程名,默認是DubboString threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 創(chuàng)建連接線程池connectionExecutor = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),new NamedThreadFactory(threadName, true),new AbortPolicyWithReport(threadName, url)); // FIXME There's no place to release connectionExecutor!// 設(shè)置工作隊列限制,默認是1000queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); }

    可以屬性中有一個連接線程池,看到在構(gòu)造函數(shù)里創(chuàng)建了該線程池,而queuewarninglimit是用來限制連接線程池的工作隊列長度,比較簡單。來看看連接和斷開連接到邏輯。

    @Override public void connected(Channel channel) throws RemotingException {try {// 核對工作隊列長度checkQueueLength();// 分發(fā)連接操作connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);} }@Override public void disconnected(Channel channel) throws RemotingException {try {// 核對工作隊列長度checkQueueLength();// 分發(fā)斷開連接操作connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);} }

    可以看到,這兩個操作都是分發(fā)到連接線程池connectionExecutor中,和AllChannelHandle類r中的分發(fā)的線程池不是同一個。而ConnectionOrderedChannelHandler的received方法跟AllChannelHandle一樣,我就不貼出來。

    (十三)MessageOnlyChannelHandler

    該類也是繼承了WrappedChannelHandler,是WrappedChannelHandler的最后一個子類,也是增強功能,不過該類只是處理了所有的消息分發(fā)到線程池。可以看到源碼,比較簡單:

    @Override public void received(Channel channel, Object message) throws RemotingException {// 獲得線程池實例ExecutorService cexecutor = getExecutorService();try {// 把消息分發(fā)到線程池cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);} }

    下面我講講解五種線程池的調(diào)度策略,也就是我在《dubbo源碼解析(八)遠程通信——開篇》中提到的Dispatcher接口的五種實現(xiàn),分別是AllDispatcher、DirectDispatcher、MessageOnlyDispatcher、ExecutionDispatcher、ConnectionOrderedDispatcher。

    (十四)AllDispatcher

    public class AllDispatcher implements Dispatcher {public static final String NAME = "all";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 線程池調(diào)度方法:任何消息以及操作都分發(fā)到線程池中return new AllChannelHandler(handler, url);}}

    對照著上述講到的AllChannelHandler,是不是很清晰這種線程池的調(diào)度方法。并且該調(diào)度方法是默認的調(diào)度方法。

    (十五)ConnectionOrderedDispatcher

    public class ConnectionOrderedDispatcher implements Dispatcher {public static final String NAME = "connection";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 線程池調(diào)度方法:連接、斷開連接分發(fā)到到線程池和其他消息分發(fā)到線程池不是同一個return new ConnectionOrderedChannelHandler(handler, url);}}

    對照上述講到的ConnectionOrderedChannelHandler,也很清晰該線程池調(diào)度方法。

    (十六)DirectDispatcher

    public class DirectDispatcher implements Dispatcher {public static final String NAME = "direct";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 直接處理消息,不分發(fā)到線程池return handler;}}

    該線程池調(diào)度方法是不調(diào)度線程池,直接執(zhí)行。

    (十七)ExecutionDispatcher

    public class ExecutionDispatcher implements Dispatcher {public static final String NAME = "execution";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 線程池調(diào)度方法:只有請求消息分發(fā)到線程池,其他都直接執(zhí)行return new ExecutionChannelHandler(handler, url);}}

    對照著上述的ExecutionChannelHandler講解,也可以很清晰的看出該線程池調(diào)度策略。

    (十八)MessageOnlyDispatcher

    public class MessageOnlyDispatcher implements Dispatcher {public static final String NAME = "message";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {// 只要是接收到的消息,都分發(fā)到線程池return new MessageOnlyChannelHandler(handler, url);}}

    對照著上述講到的MessageOnlyChannelHandler,可以很清晰該線程池調(diào)度策略。

    (十九)ChannelHandlers

    該類是通道處理器工廠,會對傳入的handler進行一次包裝,無論是Client還是Server都會做這樣的處理,也就是做了一些功能上的增強,就像上述我說到的裝飾模式中的那些功能。

    我們來看看源碼:

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url); }protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {// 調(diào)用了多消息處理器,對心跳消息進行了功能加強return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }

    最關(guān)鍵的是這兩個方法,看第二個方法,其實就是包裝了MultiMessageHandler功能,增加了多消息處理的功能,以及對心跳消息做了功能增強。

    (二十)AbstractCodec

    實現(xiàn) Codec2 接口,,其中實現(xiàn)了一些編解碼的公共邏輯。

    1.checkPayload

    protected static void checkPayload(Channel channel, long size) throws IOException {// 默認長度int payload = Constants.DEFAULT_PAYLOAD;if (channel != null && channel.getUrl() != null) {// 優(yōu)先從url中獲得消息長度配置,如果沒有則用默認長度payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);}// 如果消息長度過長,則報錯if (payload > 0 && size > payload) {ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);logger.error(e);throw e;} }

    該方法是檢驗消息長度。

    2.getSerialization

    protected Serialization getSerialization(Channel channel) {return CodecSupport.getSerialization(channel.getUrl()); }

    該方法是獲得序列化對象。

    3.isClientSide

    protected boolean isClientSide(Channel channel) {// 獲得是side對應(yīng)的valueString side = (String) channel.getAttribute(Constants.SIDE_KEY);if ("client".equals(side)) {return true;} else if ("server".equals(side)) {return false;} else {InetSocketAddress address = channel.getRemoteAddress();URL url = channel.getUrl();// 判斷url的主機地址是否和遠程地址一樣,如果是,則判斷為client,如果不是,則判斷為serverboolean client = url.getPort() == address.getPort()&& NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));// 把value設(shè)置進去channel.setAttribute(Constants.SIDE_KEY, client ? "client": "server");return client;} }

    該方法是判斷是否為客戶端側(cè)的通道。

    4.isServerSide

    protected boolean isServerSide(Channel channel) {return !isClientSide(channel); }

    該方法是判斷是否為服務(wù)端側(cè)的通道。

    (二十一)TransportCodec

    該類是傳輸編解碼器,使用 Serialization 進行序列化/反序列化,直接編解碼。關(guān)于序列化為會在后續(xù)文章中介紹。

    @Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {// 獲得序列化的 ObjectOutput 對象OutputStream output = new ChannelBufferOutputStream(buffer);ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);// 寫入 ObjectOutputencodeData(channel, objectOutput, message);objectOutput.flushBuffer();// 釋放if (objectOutput instanceof Cleanable) {((Cleanable) objectOutput).cleanup();} }@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {// 獲得反序列化的 ObjectInput 對象InputStream input = new ChannelBufferInputStream(buffer);ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);// 讀取 ObjectInputObject object = decodeData(channel, objectInput);// 釋放if (objectInput instanceof Cleanable) {((Cleanable) objectInput).cleanup();}return object; }

    該類關(guān)鍵方法就是編碼和解碼,比較好理解,直接進行了序列化和反序列化。

    (二十二)CodecAdapter

    該類是Codec 的適配器,用到了適配器模式,把Codec適配成Codec2。將Codec的編碼和解碼方法都適配成Codec2。比如很多時候都只能用Codec2的編解碼器,但是有的時候需要用Codec,但是不能滿足導(dǎo)致只能加入適配器來完成使用。

    @Override public void encode(Channel channel, ChannelBuffer buffer, Object message)throws IOException {UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(1024);// 調(diào)用舊的編解碼器的編碼codec.encode(channel, os, message);buffer.writeBytes(os.toByteArray()); }@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {byte[] bytes = new byte[buffer.readableBytes()];int savedReaderIndex = buffer.readerIndex();buffer.readBytes(bytes);UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes);// 調(diào)用舊的編解碼器的解碼Object result = codec.decode(channel, is);buffer.readerIndex(savedReaderIndex + is.position());return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result; }

    可以看到,在編碼和解碼的方法中都調(diào)用了codec的方法。

    (二十三)ChannelDelegate、ServerDelegate、ClientDelegate

    ChannelDelegate實現(xiàn)類Channel,ServerDelegate實現(xiàn)了Server,ClientDelegate實現(xiàn)了Client,都用到了裝飾模式,都作為裝飾模式中的裝飾角色,所以類中的所有實現(xiàn)方法都調(diào)用了屬性的方法。具體代碼就不貼了,朋友們可以自行查看。

    (二十四)ChannelHandlerAdapter

    該類實現(xiàn)了ChannelHandler接口,是通道處理器適配類,該類中所有實現(xiàn)方法都是空的,所有想實現(xiàn)ChannelHandler接口的類可以直接繼承該類,選擇需要實現(xiàn)的方法進行實現(xiàn),不需要實現(xiàn)ChannelHandler接口中所有方法。

    (二十五)ChannelHandlerDispatcher

    該類是通道處理器調(diào)度器,其中緩存了所有通道處理器,有一個通道處理器集合。并且每個操作都會去遍歷該集合,執(zhí)行相應(yīng)的操作,例如:

    @Override public void connected(Channel channel) {// 遍歷通道處理器集合for (ChannelHandler listener : channelHandlers) {try {// 連接listener.connected(channel);} catch (Throwable t) {logger.error(t.getMessage(), t);}} }

    (二十六)CodecSupport

    該類是編解碼工具類,提供查詢 Serialization 的功能。

    /*** 序列化對象集合 key為序列化類型編號*/ private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>(); /*** 序列化擴展名集合 key為序列化類型編號 value為序列化擴展名*/ private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();static {// 利用dubbo 的SPI機制獲得序列化擴展名Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();for (String name : supportedExtensions) {// 獲得相應(yīng)擴展名的序列化實現(xiàn)Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);byte idByte = serialization.getContentTypeId();if (ID_SERIALIZATION_MAP.containsKey(idByte)) {logger.error("Serialization extension " + serialization.getClass().getName()+ " has duplicate id to Serialization extension "+ ID_SERIALIZATION_MAP.get(idByte).getClass().getName()+ ", ignore this Serialization extension");continue;}// 緩存序列化實現(xiàn)ID_SERIALIZATION_MAP.put(idByte, serialization);// 緩存序列化編號和擴展名ID_SERIALIZATIONNAME_MAP.put(idByte, name);} }

    可以看到該類中緩存了所有的序列化對象和序列化擴展名。可以從中拿到Serialization。

    (二十七)ExceedPayloadLimitException

    該類是消息長度限制異常。

    public class ExceedPayloadLimitException extends IOException {private static final long serialVersionUID = -1112322085391551410L;public ExceedPayloadLimitException(String message) {super(message);} }

    后記

    該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

    該文章講解了Transport層的相關(guān)設(shè)計和邏輯、介紹dubbo-remoting-api中的transport包內(nèi)的源碼解,其中關(guān)鍵的是整個設(shè)計都在使用裝飾模式,傳輸層中關(guān)鍵的編解碼器以及客戶端、服務(wù)的、通道的抽象,還有關(guān)鍵的就是線程池的調(diào)度方法,熟悉那五種調(diào)度方法,對消息的處理。整個傳輸層核心的消息,很多操作圍繞著消息展開。下一篇我會講解交換層exchange部分。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。

    總結(jié)

    以上是生活随笔為你收集整理的dubbo源码解析(九)远程通信——Transport层的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。