日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

netty worker线程数量_Dubbo线程模型

發布時間:2023/12/13 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty worker线程数量_Dubbo线程模型 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Dubbo中線程池的應用還是比較廣泛的,按照consumer端到provider的RPC的方向來看,consumer端的應用業務線程到netty線程、consuemr端dubbo業務線程池,到provider端的netty boss線程、worker線程和dubbo業務線程池等。這些線程各司其職相互配合,共同完成dubbo RPC服務調用,理解dubbo線程模型對于學習Dubbo原理很有幫助。

dubbo線程模型包括線程模型策略和dubbo線程池策略兩個方面,下面就依次進行分析。

dubbo線程模型

Dubbo默認的底層網絡通信使用的是Netty,服務提供方NettyServer使用兩級線程池,其中EventLoopGroup(boss)主要用來接收客戶端的鏈接請求,并把完成TCP三次握手的連接分發給EventLoopGroup(worker)來處理,注意把boss和worker線程組稱為I/O線程,前者處理IO連接事件,后者處理IO讀寫事件。

設想下,dubbo provider端的netty IO線程是如何處理業務邏輯呢?如果處理邏輯較為簡單,并且不會發起新的I/O請求,那么直接在I/O線程上處理會更快,因為這樣減少了線程池調度與上下文切換的開銷,畢竟線程切換還是有一定成本的。如果邏輯較為復雜,或者需要發起網絡通信,比如查詢數據庫,則I/O線程必須派發請求到新的線程池進行處理,否則I/O線程會被阻塞,導致處理IO請求效率降低。

那Dubbo是如何做的呢?

Dubbo中根據請求的消息類是直接被I/O線程處理還是被業務線程池處理,Dubbo提供了下面幾種線程模型:

  • all(AllDispatcher類):所有消息都派發到業務線程池,這些消息包括請求、響應、連接事件、斷開事件等,響應消息會優先使用對于請求所使用的線程池。
  • direct(DirectDispatcher類):所有消息都不派發到業務線程池,全部在IO線程上直接執行。
  • message(MessageOnlyDispatcher類):只有請求響應消息派發到業務線程池,其他消息如連接事件、斷開事件、心跳事件等,直接在I/O線程上執行。
  • execution(ExecutionDispatcher類):只把請求類消息派發到業務線程池處理,但是響應、連接事件、斷開事件、心跳事件等消息直接在I/O線程上執行。
  • connection(ConnectionOrderedDispatcher類):在I/O線程上將連接事件、斷開事件放入隊列,有序地逐個執行,其他消息派發到業務線程池處理。

dubbo線程池可選模型較多,下面以DirectDispatcher類進行分析,其他流程類似就不在贅述。

public class DirectChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);if (executor instanceof ThreadlessExecutor) {try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}} else {handler.received(channel, message);}} }

DirectDispatcher類重寫了received方法,注意 ThreadlessExecutor 被應用在調用 future.get() 之前,先調用 ThreadlessExecutor.wait(),wait 會使業務線程在一個阻塞隊列上等待,直到隊列中被加入元素。很明顯,provider側調用getPreferredExecutorService(message)返回的不是ThreadlessExecutor,所以會在當前IO線程執行執行。

其他事件,比如連接、異常、斷開等,都是在WrappedChannelHandler中默認實現:執行在當前IO線程中執行的,代碼如下:@Override public void connected(Channel channel) throws RemotingException {handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException {handler.disconnected(channel); } @Override public void sent(Channel channel, Object message) throws RemotingException {handler.sent(channel, message); } @Override public void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(channel, exception); }

dubbo線程模型策略

了解了dubbo線程模型之后,小伙伴是不是該問:

既然有那么多的線程模型策略,dubbo線程模型具體使用的是什么策略呢?

從netty啟動流程來看,初始化NettyServer時會進行加載具體的線程模型,代碼如下:

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }

這里根據URL里的線程模型來選擇具體的Dispatcher實現類。在此,我們再提一下Dubbo提供的Dispatcher實現類,其默認的實現類是all,也就是AllDispatcher類。既然Dispatcher是通過SPI方式加載的,也就是用戶可以自定義自己的線程模型,只需實現Dispatcher類然后配置選擇使用自定義的Dispatcher類即可。

dubbo線程池策略

dubbo處理流程,為了盡量早地釋放Netty的I/O線程,某些線程模型會把請求投遞到線程池進行異步處理,那么這里所謂的線程池是什么樣的線程池呢?

其實這里的線程池ThreadPool也是一個擴展接口SPI,Dubbo提供了該擴展接口的一些實現,具體如下:
  • FixedThreadPool:創建一個具有固定個數線程的線程池。
  • LimitedThreadPool:創建一個線程池,這個線程池中的線程個數隨著需要量動態增加,但是數量不超過配置的閾值。另外,空閑線程不會被回收,會一直存在。
  • EagerThreadPool:創建一個線程池,在這個線程池中,當所有核心線程都處于忙碌狀態時,將創建新的線程來執行新任務,而不是把任務放入線程池阻塞隊列。
  • CachedThreadPool:創建一個自適應線程池,當線程空閑1分鐘時,線程會被回收;當有新請求到來時,會創建新線程。

知道了這些線程池之后,那么是什么時候進行SPI加載對應的線程池實現呢?具體是在dubbo 線程模型獲取對應線程池時進行SPI加載的,具體邏輯在方法 org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository#createExecutor中:

private ExecutorService createExecutor(URL url) {return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); } @SPI("fixed") public interface ThreadPool {@Adaptive({THREADPOOL_KEY})Executor getExecutor(URL url); }

從代碼來看,默認的線程池策略是fixed模式的線程池,其coreSize默認為200,隊列大小為0,其代碼如下:

public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));} }注:其他線程池策略和FixedThreadPool類似,只不過線程池參數不同而已,這里不再贅述。

小結

從dubbo提供的幾種線程模型和線程池策略來看,基本上能滿足絕大多數場景的需求了,由于dubbo線程模型和線程池策略都是通過SPI的方式進行加載的,因此如果業務上需要,我們完全可以自定義對應的線程模型和線程池策略,只需要配置下即可。

推薦閱讀

Dubbo RPC在consumer端是如何跑起來的?mp.weixin.qq.comdubbo版的"明朝那些事兒"?mp.weixin.qq.com責任鏈的2種實現方式,你更pick哪一種?mp.weixin.qq.com網絡數據是如何傳遞給進程的?mp.weixin.qq.comLinux管道那些事兒?mp.weixin.qq.com從socket api看網絡通信流程?mp.weixin.qq.com

總結

以上是生活随笔為你收集整理的netty worker线程数量_Dubbo线程模型的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。