netty worker线程数量_Dubbo线程模型
dubbo線程模型包括線程模型策略和dubbo線程池策略兩個(gè)方面,下面就依次進(jìn)行分析。
dubbo線程模型
Dubbo默認(rèn)的底層網(wǎng)絡(luò)通信使用的是Netty,服務(wù)提供方NettyServer使用兩級(jí)線程池,其中EventLoopGroup(boss)主要用來(lái)接收客戶端的鏈接請(qǐng)求,并把完成TCP三次握手的連接分發(fā)給EventLoopGroup(worker)來(lái)處理,注意把boss和worker線程組稱為I/O線程,前者處理IO連接事件,后者處理IO讀寫事件。
設(shè)想下,dubbo provider端的netty IO線程是如何處理業(yè)務(wù)邏輯呢?如果處理邏輯較為簡(jiǎn)單,并且不會(huì)發(fā)起新的I/O請(qǐng)求,那么直接在I/O線程上處理會(huì)更快,因?yàn)檫@樣減少了線程池調(diào)度與上下文切換的開(kāi)銷,畢竟線程切換還是有一定成本的。如果邏輯較為復(fù)雜,或者需要發(fā)起網(wǎng)絡(luò)通信,比如查詢數(shù)據(jù)庫(kù),則I/O線程必須派發(fā)請(qǐng)求到新的線程池進(jìn)行處理,否則I/O線程會(huì)被阻塞,導(dǎo)致處理IO請(qǐng)求效率降低。
那Dubbo是如何做的呢?Dubbo中根據(jù)請(qǐng)求的消息類是直接被I/O線程處理還是被業(yè)務(wù)線程池處理,Dubbo提供了下面幾種線程模型:
- all(AllDispatcher類):所有消息都派發(fā)到業(yè)務(wù)線程池,這些消息包括請(qǐng)求、響應(yīng)、連接事件、斷開(kāi)事件等,響應(yīng)消息會(huì)優(yōu)先使用對(duì)于請(qǐng)求所使用的線程池。
- direct(DirectDispatcher類):所有消息都不派發(fā)到業(yè)務(wù)線程池,全部在IO線程上直接執(zhí)行。
- message(MessageOnlyDispatcher類):只有請(qǐng)求響應(yīng)消息派發(fā)到業(yè)務(wù)線程池,其他消息如連接事件、斷開(kāi)事件、心跳事件等,直接在I/O線程上執(zhí)行。
- execution(ExecutionDispatcher類):只把請(qǐng)求類消息派發(fā)到業(yè)務(wù)線程池處理,但是響應(yīng)、連接事件、斷開(kāi)事件、心跳事件等消息直接在I/O線程上執(zhí)行。
- connection(ConnectionOrderedDispatcher類):在I/O線程上將連接事件、斷開(kāi)事件放入隊(duì)列,有序地逐個(gè)執(zhí)行,其他消息派發(fā)到業(yè)務(wù)線程池處理。
dubbo線程池可選模型較多,下面以DirectDispatcher類進(jìn)行分析,其他流程類似就不在贅述。
public class DirectChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);if (executor instanceof ThreadlessExecutor) {try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}} else {handler.received(channel, message);}} }DirectDispatcher類重寫了received方法,注意 ThreadlessExecutor 被應(yīng)用在調(diào)用 future.get() 之前,先調(diào)用 ThreadlessExecutor.wait(),wait 會(huì)使業(yè)務(wù)線程在一個(gè)阻塞隊(duì)列上等待,直到隊(duì)列中被加入元素。很明顯,provider側(cè)調(diào)用getPreferredExecutorService(message)返回的不是ThreadlessExecutor,所以會(huì)在當(dāng)前IO線程執(zhí)行執(zhí)行。
其他事件,比如連接、異常、斷開(kāi)等,都是在WrappedChannelHandler中默認(rèn)實(shí)現(xiàn):執(zhí)行在當(dāng)前IO線程中執(zhí)行的,代碼如下:@Override public void connected(Channel channel) throws RemotingException {handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException {handler.disconnected(channel); } @Override public void sent(Channel channel, Object message) throws RemotingException {handler.sent(channel, message); } @Override public void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(channel, exception); }dubbo線程模型策略
了解了dubbo線程模型之后,小伙伴是不是該問(wèn):
既然有那么多的線程模型策略,dubbo線程模型具體使用的是什么策略呢?從netty啟動(dòng)流程來(lái)看,初始化NettyServer時(shí)會(huì)進(jìn)行加載具體的線程模型,代碼如下:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }這里根據(jù)URL里的線程模型來(lái)選擇具體的Dispatcher實(shí)現(xiàn)類。在此,我們?cè)偬嵋幌翫ubbo提供的Dispatcher實(shí)現(xiàn)類,其默認(rèn)的實(shí)現(xiàn)類是all,也就是AllDispatcher類。既然Dispatcher是通過(guò)SPI方式加載的,也就是用戶可以自定義自己的線程模型,只需實(shí)現(xiàn)Dispatcher類然后配置選擇使用自定義的Dispatcher類即可。
dubbo線程池策略
dubbo處理流程,為了盡量早地釋放Netty的I/O線程,某些線程模型會(huì)把請(qǐng)求投遞到線程池進(jìn)行異步處理,那么這里所謂的線程池是什么樣的線程池呢?
其實(shí)這里的線程池ThreadPool也是一個(gè)擴(kuò)展接口SPI,Dubbo提供了該擴(kuò)展接口的一些實(shí)現(xiàn),具體如下:- FixedThreadPool:創(chuàng)建一個(gè)具有固定個(gè)數(shù)線程的線程池。
- LimitedThreadPool:創(chuàng)建一個(gè)線程池,這個(gè)線程池中的線程個(gè)數(shù)隨著需要量動(dòng)態(tài)增加,但是數(shù)量不超過(guò)配置的閾值。另外,空閑線程不會(huì)被回收,會(huì)一直存在。
- EagerThreadPool:創(chuàng)建一個(gè)線程池,在這個(gè)線程池中,當(dāng)所有核心線程都處于忙碌狀態(tài)時(shí),將創(chuàng)建新的線程來(lái)執(zhí)行新任務(wù),而不是把任務(wù)放入線程池阻塞隊(duì)列。
- CachedThreadPool:創(chuàng)建一個(gè)自適應(yīng)線程池,當(dāng)線程空閑1分鐘時(shí),線程會(huì)被回收;當(dāng)有新請(qǐng)求到來(lái)時(shí),會(huì)創(chuàng)建新線程。
知道了這些線程池之后,那么是什么時(shí)候進(jìn)行SPI加載對(duì)應(yīng)的線程池實(shí)現(xiàn)呢?具體是在dubbo 線程模型獲取對(duì)應(yīng)線程池時(shí)進(jìn)行SPI加載的,具體邏輯在方法 org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository#createExecutor中:
private ExecutorService createExecutor(URL url) {return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); } @SPI("fixed") public interface ThreadPool {@Adaptive({THREADPOOL_KEY})Executor getExecutor(URL url); }從代碼來(lái)看,默認(rèn)的線程池策略是fixed模式的線程池,其coreSize默認(rèn)為200,隊(duì)列大小為0,其代碼如下:
public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));} }注:其他線程池策略和FixedThreadPool類似,只不過(guò)線程池參數(shù)不同而已,這里不再贅述。小結(jié)
從dubbo提供的幾種線程模型和線程池策略來(lái)看,基本上能滿足絕大多數(shù)場(chǎng)景的需求了,由于dubbo線程模型和線程池策略都是通過(guò)SPI的方式進(jìn)行加載的,因此如果業(yè)務(wù)上需要,我們完全可以自定義對(duì)應(yīng)的線程模型和線程池策略,只需要配置下即可。
推薦閱讀
Dubbo RPC在consumer端是如何跑起來(lái)的?mp.weixin.qq.comdubbo版的"明朝那些事兒"?mp.weixin.qq.com責(zé)任鏈的2種實(shí)現(xiàn)方式,你更pick哪一種?mp.weixin.qq.com網(wǎng)絡(luò)數(shù)據(jù)是如何傳遞給進(jìn)程的?mp.weixin.qq.comLinux管道那些事兒?mp.weixin.qq.com從socket api看網(wǎng)絡(luò)通信流程?mp.weixin.qq.com總結(jié)
以上是生活随笔為你收集整理的netty worker线程数量_Dubbo线程模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 工商银行怎么更新身份证信息 怎样更新工商
- 下一篇: function click_click