日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty学习笔记(三)EventLoopGroup开篇

發布時間:2024/4/11 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty学习笔记(三)EventLoopGroup开篇 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

使用Netty都需要定義EventLoopGroup,也就是線程池

前面講過在客戶端只需要一個EventLoopGroup就夠了,而在服務端就需要兩個Group--bossGroup和workerGroup,這與Netty的線程模型有關,使用的是主從Reactor多線程模型?,兩個線程池,一個用于監聽端口,創建新連接(boosGroup),一個用于處理每一條連接的數據讀寫和業務邏輯(workerGroup)

以下的代碼里都去掉了一些try...catch和非核心代碼,只保留了主要的代碼流程

EventLoopGroup初始化

其類圖如下所示:

可以發現EventLoopGroup都實現了ScheduledExecutorService,本質是一個帶有schedule的線程池
NioEventLoopGroup有很多重載的構造方法,最后都調用了如下方法:

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}

調用其父類MultithreadEventLoopGroup的構造方法:?

private static final int DEFAULT_EVENT_LOOP_THREADS;static {DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));} protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);}

這里會判斷當前nThreads是否為0,如果為0的話則使用默認的Threads數,其實就是處理器核心數*2 ,我的demo里都沒有指定線程數,那么最終生成的EventLoopGroup的線程數就處理器核心數*2

再跟蹤下去,最后會調用MultithreadEventExecutorGroup的如下構造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;}chooser = chooserFactory.newChooser(children);}

上面的代碼會先創建一個executor,然后再初始化一個EventExecutor數組(長度就是nThreads),然后調用newChild對每個元素進行初始化,然后調用newChooser方法創建一個chooser

先看下這里的executor的創建,其實就是創建一個Executor的實例對象,對于execute傳入的command,都會創建一個線程并啟動來執行,線程id為poolName + '-' + poolId.incrementAndGet() + '-'+ nextId.incrementAndGet()

public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();} }

這里的newChild方法,就是實例化一個?NioEventLoop 對象, 并返回,所以EventLoopGroup里的每一個元素都是NioEventLoop,源碼如下:

@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}

看下這里NioEventLoop的類圖:注意下這里的NioEventLoop是實現了SingleThreadEventExecutor,參數Executor最后也會保存在該類的executor屬性字段里

接下來看下newChooser方法的實現 : 如果executor,length是2的冪次其實就是nThreads是2的冪次,那么就會使用PowerOfTowEventExecutorChooser來進行選擇,否則就使用普通的選擇器

public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}

兩個選擇器實現的區別在于獲取下一個EventExecutor的方法next(),普通選擇器是對idx遞增后對nThreads取模
PowerOfTow實現的也是這個邏輯,只不過使用了位運算符,運算速度更快

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTowEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}}

總結下EventLoopGroup的初始化:

  • EventLoopGroup的父類MultithreadEventExecutorGroup內部維護一個類型為 EventExecutor的 線程數組, 其大小是 nThreads
  • 如果實例化NioEventLoopGroup 時,沒有指定默認值nThreads就等于處理器*2
  • MultithreadEventExecutorGroup 中通過newChild()抽象方法來初始化 children 數組,每個元素都是NioEventLoop
  • 根據nThreads數選擇不同的chooser

EventLoopGroup執行

在ServerBootstrap 初始化時,調用了serverBootstrap.group(bossGroup,workerGroup)設置了兩個EventLoopGroup,我們跟
蹤進去以后會看到:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;}

這個方法初始化了兩個字段,一個是在 super.group(parentGroup)中完成初始化,另一個是通過this.childGroup = childGroup,分別將bossGroup和workerGroup保存在AbstractBootstrap的group屬性和ServerBootstrap的childGroup屬性

接著從應用程序的啟動代碼 serverBootstrap.bind()來監聽一個本地端口
通過bind方法會調用eventLoop()的execute()方法,最后會進入SingleThreadEventExecutor的execute()方法

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

SingleThreadEventExecutor對于添加進來的task,會判斷當前執行的currentThread是否等于SingleThreadEventExecutor的thread,如果第一次添加或者當前調用的線程不是SingleThreadEventExecutor的thread,inEventLoop()就會返回false,就會先執行啟動當前SingleThreadEventExecutor的startThread()方法再添加task到任務隊列(LinkedBlockingQueue);否則就直接添加任務到任務隊列

private final Queue<Runnable> taskQueue;public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}//對于有新任務添加,就會執行wakeupif (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

簡單來說,這里的inEventLoop()就是判斷當前線程是否是reactor線程,這樣的作用是:

1.讓task只在reactor線程進行,保證單線程

2.第一次判斷會幫我們啟動reactor線程

這里的startThread()就是通過一個標志判斷reactor線程是否已啟動,如果沒有啟動就執行doStartThread來啟動,
SingleThreadEventExecutor 在執行doStartThread()方法的時候,會調用executor的execute方法,會將調用NioEventLoop(SingleThreadEventExecutor 的子類)的run方法封裝成一個Runnable讓線程池executor去執行(還會將當前線程保存在SingleThreadEventExecutor的thread屬性字段里)。這里的executor就是前面講到的ThreadPerTaskExecutor ,它的execute會對每個傳入的Runnable創建一個FastThreadLocalThread線程對象并調用它的start方法去執行

private void startThread() {//判斷當前EventLoop線程是否有啟動if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {//進行了一次CAS操作,為了保證線程安全if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();...boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} ...}});}

?通過前面的分析我們可以看出,最終執行的主體方法是:NioEventLoop的run方法,那么我們看下這里的run方法到底執行了什么

@Overrideprotected void run() {for (;;) {try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT://select輪詢, 設置wakenUp為false并返回之前的wakenUp值select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}//去除了無關緊要的代碼processSelectedKeys();runAllTasks(); } catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception....}}

先看下這里的策略選擇

@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}

如果任務隊列里沒有task,就返回策略SELECT,否則就執行selectSupplier.get(),實際就是執行了一次selectNow(非阻塞)方法并返回

可以看到,上面的代碼是一個死循環,做的事情主要是以下三個:

  • 輪詢注冊到reactor線程上的對應的selector的所有channel的IO事件
  • 根據不同的SelectKeys進行處理??processSelectedKeys();
  • 處理任務隊列 runAllTasks(); ??

輪詢Select

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;//第一個退出條件if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.//第二個退出條件 if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;//第三個退出條件if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}...}

不難看出這里的select是一個死循環,它的退出條件有三種:

  • 距離當前截止時間快到了(<=0.5ms)就跳出循環,如果此時還沒有執行select,就執行一次selectNow
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;timeoutMillis <= 0;
  • 如果任務隊列里有任務需要執行就退出(避免由于select阻塞導致任務不能及時執行),退出前也執行一下selectNow
  • selector.select(XX)的阻塞被喚醒后,如果滿足上面的條件就會退出(selectedKeys不為0,任務隊列里有任務等)

前面提到過,如果SingleThreadEventExecutor執行execute(Runnable task)添加任務會執行wakeup方法,然后會執行NioEventLoop重寫的wakeup方法

@Override public void execute(Runnable task) {//addTaskWakesUp 默認是false 如果是外部線程添加的,inEventLoop就會是falseif (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);} }

當inEventLoop為false,并且wakenUp變量CAS操作成功(由false變為true,保證線程安全),則調用selector.wakeup()喚醒阻塞的select方法

@Overrideprotected void wakeup(boolean inEventLoop) {if (!inEventLoop && wakenUp.compareAndSet(false, true)) {selector.wakeup();}}

Netty解決JDK空輪訓Bug? ? ??

出現此 Bug 是因為當 Selector 的輪詢結果為空,也沒有wakeup 或新消息處理,則發生空
輪詢,CPU 使用率達到100%,導致Nio Server不可用,Netty通過一種巧妙的方式來避開了這個空輪詢問題

private void select(boolean oldWakenUp) throws IOException {long currentTimeNanos = System.nanoTime();for (;;) {...int selectedKeys = selector.select(timeoutMillis);selectCnt ++;//解決jdk的nio buglong time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {rebuildSelector();selector = this.selector;selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time; ...} }

從上面的代碼中可以看出,Selector每一次輪詢都會進行計數,selectCnt++,開始輪詢和輪詢完成都會把當前時間戳賦值給currentTimeNanos和time,兩個時間的時間差就是本次輪詢消耗的時間

如果持續的時間大于等于timeoutMillis(輪詢的時間),說明就是一次有效的輪詢,重置selectCnt標志,否則,表明該阻塞方法并沒有阻塞這么長時間,可能觸發了jdk的空輪詢bug,當空輪詢的次數超過一個閥值的時候,默認是512,就開始重建selector

public void rebuildSelector() {final Selector oldSelector = selector;final Selector newSelector;newSelector = openSelector();int nChannels = 0;for (;;) {try {for (SelectionKey key: oldSelector.keys()) {Object a = key.attachment();if (!key.isValid() || key.channel().keyFor(newSelector) != null) {continue;}int interestOps = key.interestOps();key.cancel();SelectionKey newKey = key.channel().register(newSelector, interestOps, a);if (a instanceof AbstractNioChannel) {// Update SelectionKey((AbstractNioChannel) a).selectionKey = newKey;}nChannels ++;}} catch (ConcurrentModificationException e) {// Probably due to concurrent modification of the key set.continue;}break;}selector = newSelector;oldSelector.close();}

rebuildSelector主要做了三件事:

  • 創建一個新的 Selector。
  • 將原來Selector 中注冊的事件全部取消。
  • 將可用事件重新注冊到新的 Selector 中,并激活。

參考:?
netty源碼分析之揭開reactor線程的面紗

Netty 源碼分析-EventLoop

總結

以上是生活随笔為你收集整理的Netty学习笔记(三)EventLoopGroup开篇的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。