日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Dubbo线程模型和调度策略

發布時間:2025/3/8 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Dubbo线程模型和调度策略 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、服務調用

首先服務消費者通過代理對象 Proxy 發起遠程調用,接著通過網絡客戶端 Client 將編碼后的請求發送給服務提供方的網絡層上,也就是 Server。Server 在收到請求后,首先要做的事情是對數據包進行解碼。然后將解碼后的請求發送至分發器 Dispatcher,再由分發器將請求派發到指定的線程池上,最后由線程池調用具體的服務。這就是一個遠程調用請求的發送與接收過程。

那么在dubbo中請求是如何派發的?以及線程模型是什么樣的那?

二、I/O線程和業務線程分離

  • 如果事件處理的邏輯能迅速完成,并且不會發起新的 IO請求,比如只是在內存中記個標識,則直接在 IO線程上處理更快,因為減少了線程池調度。

  • 但如果事件處理邏輯較慢,或者需要發起新的 IO 請求,比如需要查詢數據庫,則必須派發到線程池,否則 IO 線程阻塞,將導致不能接收其它請求。

  • 如果用 IO 線程處理事件,又在事件處理過程中發起新的 IO 請求,比如在連接事件中發起登錄請求,會報“可能引發死鎖”異常,但不會真死鎖。

所以在真實的業務場景中是需要將業務線程和I/O線程進行分離處理的。dubbo作為一個服務治理框架,底層的采用Netty作為網絡通信的組件,在請求派發的時候支持不同的派發策略。

參考文章:www.cnblogs.com/my_life/art…

三、請求派發策略

連接建立

從官方描述來看,duboo支持五種派發策略,下面看下是如何實現的。以Ntty4.x為例:

  • NettyServerpublic NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));} 復制代碼
  • ChannelHandlers#wrapInternalprotected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {// 選擇調度策略 默認是allreturn new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); } 復制代碼在NettyServer的構造方法中通過ChannelHandlers#wrap方法設置MultiMessageHandler,HeartbeatHandler并通過SPI擴展選擇調度策略。
  • NettyServer#doOpen
  • protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();// 多線程模型// boss線程池,負責和消費者建立新的連接bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));// worker線程池,負責連接的數據交換workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //內存池.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()) //設置編解碼器.addLast("encoder", adapter.getEncoder()).addLast("handler", nettyServerHandler);}});// bind 端口ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();} 復制代碼

    設置Netty的boss線程池數量為1,worker線程池(也就是I/O線程)為cpu核心數+1和向Netty中注測Handler用于消息的編解碼和處理。

    如果我們在一個JVM進程只暴露一個Dubbo服務端口,那么一個JVM進程只會有一個NettyServer實例,也會只有一個NettyHandler實例。并且設置了三個handler,用來處理編解碼、連接的創建、消息讀寫等。在dubbo內部定義了一個ChannelHandler用來和Netty的Channel關聯起來,通過上述的代碼會發現NettyServer本身也是一個ChannelHandler。通過NettyServer#doOpen暴露服務端口后,客戶端就能和服務端建立連接了,而提供者在初始化連接后會調用NettyHandler#channelActive方法來創建一個NettyChannel

  • NettyChannel
  • public void channelActive(ChannelHandlerContext ctx) throws Exception {logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());//獲取或者創建一個NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try {if (channel != null) {// <ip:port, channel>channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);}// 這里的 handler就是NettyServerhandler.connected(channel);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} 復制代碼

    與Netty和Dubbo都有自己的ChannelHandler一樣,Netty和Dubbo也有著自己的Channel。該方法最后會調用NettyServer#connected方法來檢查新添加channel后是否會超出提供者配置的accepts配置,如果超出,則直接打印錯誤日志并關閉該Channel,這樣的話消費者端自然會收到連接中斷的異常信息,詳細可以見AbstractServer#connected方法。

  • AbstractServer#connected
  • public void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");ch.close();return;}Collection<Channel> channels = getChannels();//大于accepts的tcp連接直接關閉if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);ch.close();return;}super.connected(ch);} 復制代碼
    • 在dubbo中消費者和提供者默認只建立一個TCP長連接(詳細代碼請參考官網源碼導讀,服務引用一節),為了增加消費者調用服務提供者的吞吐量,可以在消費方增加如下配置:
    <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/> 復制代碼
    • 提供者可以使用accepts控制長連接的數量防止連接數量過多,配置如下:
    <dubbo:protocol name="dubbo" port="20880" accepts="10"/> 復制代碼

    請求接收

    當連接建立完成后,消費者就可以請求提供者的服務了,當請求到來,提供者這邊會依次經過如下Handler的處理:

    --->NettyServerHandler#channelRead:接收請求消息。

    --->AbstractPeer#received:如果服務已經關閉,則返回,否則調用下一個Handler來處理。

    --->MultiMessageHandler#received:如果是批量請求,則依次對請求調用下一個Handler來處理。

    --->HeartbeatHandler#received: 處理心跳消息。

    --->AllChannelHandler#received:該Dubbo的Handler非常重要,因為從這里是IO線程池和業務線程池的隔離。

    --->DecodeHandler#received: 消息解碼。

    --->HeaderExchangeHandler#received:消息處理。

    --->DubboProtocol : 調用服務。

  • AllChannelHandler#received:
  • public void received(Channel channel, Object message) throws RemotingException {// 獲取業務線程池ExecutorService cexecutor = getExecutorService();try {// 使用線程池處理消息cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}} 復制代碼

    這里對execute進行了異常捕獲,這是因為I/O線程池是無界的,但業務線程池可能是有界的,所以進行execute提交可能會遇到RejectedExecutionException異常 。

    那么這里是如何獲取到業務線程池的那?實際上WrappedChannelHandler是xxxChannelHandlerd的裝飾類,根據dubbo spi可以知道,獲取AllChannelHandler會首先實例化WrappedChannelHandler。

  • WrappedChannelHandler
  • public WrappedChannelHandler(ChannelHandler handler, URL url) {this.handler = handler;this.url = url;// 獲取業務線程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {componentKey = Constants.CONSUMER_SIDE;}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);}復制代碼

    線程模型

  • FixedThreadPool
  • public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {// 線程池名稱DubboServerHanler-server:portString name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 缺省線程數量200int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);// 任務隊列類型int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}} 復制代碼

    缺省情況下使用200個線程和SynchronousQueue這意味著如果如果線程池所有線程都在工作再有新任務會直接拒絕。

  • CachedThreadPool
  • public class CachedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 核心線程數量 缺省為0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// 最大線程數量 缺省為Integer.MAX_VALUEint threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);// queue 缺省為0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);// 空閑線程存活時間int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));} } 復制代碼

    緩存線程池,可以看出如果提交任務的速度大于maxThreads將會不斷創建線程,極端條件下將會耗盡CPU和內存資源。在突發大流量進入時不適合使用。

  • LimitedThreadPool
  • public class LimitedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 缺省核心線程數量為0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// 缺省最大線程數量200int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);// 任務隊列缺省0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));}} 復制代碼

    不配置的話和FixedThreadPool沒有區別。

  • EagerThreadPool
  • public class EagerThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);// 0int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);// Integer.MAX_VALUEint threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);// 0int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);// 60sint alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);// init queue and executor// 初始任務隊列為1TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,threads,alive,TimeUnit.MILLISECONDS,taskQueue,new NamedInternalThreadFactory(name, true),new AbortPolicyWithReport(name, url));taskQueue.setExecutor(executor);return executor;} } 復制代碼

    EagerThreadPoolExecutor

    public void execute(Runnable command) {if (command == null) {throw new NullPointerException();}// do not increment in method beforeExecute!//已提交任務數量submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) { //大于最大線程數被拒絕任務 重新添加到任務隊列// retry to offer the task into queue.final TaskQueue queue = (TaskQueue) super.getQueue();try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.", rx);}} catch (InterruptedException x) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(x);}} catch (Throwable t) {// decrease any waysubmittedTaskCount.decrementAndGet();throw t;}} 復制代碼

    TaskQueue

    public boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");}// 獲取當前線程池中的線程數量int currentPoolThreadSize = executor.getPoolSize();// have free worker. put task into queue to let the worker deal with task.// 如果已經提交的任務數量小于當前線程池中線程數量(不是很理解這里的操作)if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {return super.offer(runnable);}// return false to let executor create new worker.//當前線程數小于最大線程程數直接創建新workerif (currentPoolThreadSize < executor.getMaximumPoolSize()) {return false;}// currentPoolThreadSize >= maxreturn super.offer(runnable);} 復制代碼

    優先創建Worker線程池。在任務數量大于corePoolSize但是小于maximumPoolSize時,優先創建Worker來處理任務。當任務數量大于maximumPoolSize時,將任務放入阻塞隊列中。阻塞隊列充滿時拋出RejectedExecutionException。(相比于cached:cached在任務數量超過maximumPoolSize時直接拋出異常而不是將任務放入阻塞隊列)。

    根據以上的代碼分析,如果消費者的請求過快很有可能導致服務提供者業務線程池拋出RejectedExecutionException異常。這個異常是duboo的采用的線程拒絕策略AbortPolicyWithReport#rejectedExecution拋出的,并且會被反饋到消費端,此時簡單的解決辦法就是將提供者的服務調用線程池數目調大點,例如如下配置:

    <dubbo:provider threads="500"/> 或 <dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/> 復制代碼

    為了保證模塊內的主要服務有線程可用(防止次要服務搶占過多服務調用線程),可以對次要服務進行并發限制,例如:

    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/> 復制代碼

    dubbo的dispatcher 策略默認是all,實際上比較好的處理方式是I/O線程和業務線程分離,所以采取message是比較好得配置。并且如果采用all如果使用的dubo版本比較低很有可能會觸發dubbo的bug。一旦業務線程池滿了,將拋出執行拒絕異常,將進入caught方法來處理,而該方法使用的仍然是業務線程池,所以很有可能這時業務線程池還是滿的,導致下游的一個HeaderExchangeHandler沒機會調用,而異常處理后的應答消息正是HeaderExchangeHandler#caught來完成的,所以最后NettyHandler#writeRequested沒有被調用,Consumer只能死等到超時,無法收到Provider的線程池打滿異常(2.6.x已經修復該問題)。

    • 推薦配置
    <dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" /> 復制代碼

    參考文章:manzhizhen.iteye.com/blog/239117…

    轉載于:https://juejin.im/post/5ce26547e51d4510be453efa

    與50位技術專家面對面20年技術見證,附贈技術全景圖

    總結

    以上是生活随笔為你收集整理的Dubbo线程模型和调度策略的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。