日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty系列(三):说说NioEventLoop

發布時間:2025/3/21 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty系列(三):说说NioEventLoop 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

本來想先寫下NioServerSocketChannel以及NioSocketChannel的注冊流程的,但是最后發現始終離不開NioEventLoop這個類,所以在這之前必須得先講解下NioEventLoop這個類到底是用來做啥的。其實在第一篇文章里面有提及到它的,但是沒有詳細的去講解,接下來會對它分析一波。


設計模型

在進入正文之前,先簡單的了解下NioEventLoop的工作模型(服務端):

假設一個NioEventLoopGroup(這里服務端會用兩個Group)里面有4個NioEventLoop,那么netty中的實際工作模型就如上圖所示,服務端會用默認的選擇規則從Group1中選擇出一個NioEventLoop注冊ServerChannel,并綁定一個OP_ACCEPT用于監聽客戶端發起的連接請求,一旦有新的連接進來,服務端則會從Group2中按一定的規則選出一個NioEventLoop來注冊SocketChannel,并綁定OP_READ興趣事件,這里注意,一個NioEventLoop可以綁定多個SocketChannel。具體的注冊流程我會在下一篇文章中寫出來。

下面進入正題。


構造流程

NioEventLoop具體的構造流程大家可以去我的Netty系列(一):NioEventLoopGroup源碼解析中去看一下,里面說的還算蠻詳細的。下面是其調用的構造函數,咱們可以觀察到其身上會綁定一個選擇器Selector,供后期channel注冊的時候使用的,這一塊是JAVA NIO相關的知識點。


內部還維護著一個executor去開啟執行線程的,以及taskQueue任務隊列和一個tailTasks尾部隊列(這個隊列里面的任務是在每次執行taskQueue任務隊列中的任務結束后都會去調用的,不多介紹)。上面介紹的三個四個內部結構Selector,executor,taskQueue,tailTasks會在后面多次提起。
下圖是NioEventLoop的簡單的層級結構(下圖取之于Netty in Action):


NioEventLoop.execute

這里我們先看一下NioEventLoop的execute方法。實際上這個方法是在其父類SingleThreadEventExecutor中。這個方法的功能就是將任務丟到taskQueue中。

1????public?void?execute(Runnable?task)?{
2????????if?(task?==?null)?{
3????????????throw?new?NullPointerException("task");
4????????}
5
6????????boolean?inEventLoop?=?inEventLoop();
7????????addTask(task);
8????????if?(!inEventLoop)?{
9????????????//?開啟工作線程,實際上也就是執行NioEventLoop中的run方法,下面會介紹
10????????????startThread();
11????????????if?(isShutdown())?{
12????????????????boolean?reject?=?false;
13????????????????try?{
14????????????????????if?(removeTask(task))?{
15????????????????????????reject?=?true;
16????????????????????}
17????????????????}?catch?(UnsupportedOperationException?e)?{
18????????????????????//?The?task?queue?does?not?support?removal?so?the?best?thing?we?can?do?is?to?just?move?on?and
19????????????????????//?hope?we?will?be?able?to?pick-up?the?task?before?its?completely?terminated.
20????????????????????//?In?worst?case?we?will?log?on?termination.
21????????????????}
22????????????????if?(reject)?{
23????????????????????reject();
24????????????????}
25????????????}
26????????}
27
28????????if?(!addTaskWakesUp?&&?wakesUpForTask(task))?{
29????????????wakeup(inEventLoop);
30????????}
31????}
復制代碼
  • 添加task到執行隊列中,也就是咱們上文提起的taskQueue中。
  • 判斷這個NioEventLoop中的是否已經開啟過線程。
  • 若未開啟,則必須先啟動線程任務,也就是我們下文會介紹的run方法。
  • 首次初始化會在taskQueue中丟一個空任務去喚醒線程。
  • NioEventLoop的工作模式實際上就是開啟一個單線程跑一個死循環,然后一直輪詢taskQueue隊列是否有任務添加進來,然后就去處理任務,還有就是如果注冊在selector上的channel有興趣事件進來,也會去處理selectorKeys,這一塊下面會做介紹。


    NioEventLoop.run

    現在看看NioEventLoop中的run方法

    1????protected?void?run()?{
    2????????for?(;;)?{
    3????????????try?{
    4????????????????try?{
    5????????????????????//?按默認配置的話要么返回select.selectNow(),
    6????????????????????//要么返回SelectStrategy.SELECT
    7???????????????????switch?(selectStrategy.calculateStrategy(selectNowSupplier,?hasTasks()))?{
    8????????????????????case?SelectStrategy.CONTINUE:
    9????????????????????????continue;
    10
    11????????????????????case?SelectStrategy.BUSY_WAIT:
    12????????????????????case?SelectStrategy.SELECT:
    13????????????????????????select(wakenUp.getAndSet(false));
    14
    15????????????????????????if?(wakenUp.get())?{
    16????????????????????????????selector.wakeup();
    17????????????????????????}
    18????????????????????????//?fall?through
    19????????????????????default:
    20????????????????????}
    21????????????????}?catch?(IOException?e)?{
    22????????????????????rebuildSelector0();
    23????????????????????handleLoopException(e);
    24????????????????????continue;
    25????????????????}
    26
    27????????????????cancelledKeys?=?0;
    28????????????????needsToSelectAgain?=?false;
    29????????????????final?int?ioRatio?=?this.ioRatio;
    30????????????????if?(ioRatio?==?100)?{
    31????????????????????try?{
    32????????????????????????//?IO操作,根據selectedKeys去處理
    33????????????????????????processSelectedKeys();
    34????????????????????}?finally?{
    35????????????????????????//?保證執行完所有的任務
    36????????????????????????runAllTasks();
    37????????????????????}
    38????????????????}?else?{
    39????????????????????final?long?ioStartTime?=?System.nanoTime();
    40????????????????????try?{
    41????????????????????????//?IO操作,根據selectedKeys去處理
    42????????????????????????processSelectedKeys();
    43????????????????????}?finally?{
    44????????????????????????//?按一定的比例去處理任務,有可能遺留一部分任務下次進行處理
    45????????????????????????final?long?ioTime?=?System.nanoTime()?-?ioStartTime;
    46????????????????????????runAllTasks(ioTime?*?(100?-?ioRatio)?/?ioRatio);
    47????????????????????}
    48????????????????}
    49????????????}?catch?(Throwable?t)?{
    50????????????????handleLoopException(t);
    51????????????}
    52????????????//?Always?handle?shutdown?even?if?the?loop?processing?threw?an?exception.
    53????????????try?{
    54????????????????//?釋放資源,將注冊的channel全部關閉掉。
    55????????????????if?(isShuttingDown())?{
    56????????????????????closeAll();
    57????????????????????if?(confirmShutdown())?{
    58????????????????????????return;
    59????????????????????}
    60????????????????}
    61????????????}?catch?(Throwable?t)?{?
    62????????????????handleLoopException(t);
    63????????????}
    64????????}
    65????}
    復制代碼
  • 這里有個默認的計算策略:
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT
  • 1.有任務便會直接返回select.selectNow(),則會直接去跑任務或者是去處理selectorKeys;
    2.若沒任務,則會走select(wakenUp.getAndSet(false))方法,里面會有一個timeout超時處理,selector.select(timeoutMillis),超時后也會去跑任務或者是去處理selectorKeys;

    這一塊具體細節也很多,只是說一下處理流程。

  • 注意上面有個ioRatio == 100這個判斷條件,如果為100,則會將任務全部處理完成;否則會與io操作按一定的比例去執行任務。
  • 這里的IO操作就是processSelectedKeys()方法,代碼雖然很長,但是干的活就是根據不同的興趣事件干不同的活,里面有對OP_READ OP_ACCEPT OP_WRITE OP_CONNECT等等不同興趣事件的不同處理方法,這一塊應該是JAVA NIO里面的相關知識點。感興趣的朋友可以debug針對某個觸發事件研究一下。


    runAllTasks

    執行任務的代碼如下(下面是runAllTasks的代碼):

    1????protected?boolean?runAllTasks()?{
    2????????assert?inEventLoop();
    3????????boolean?fetchedAll;
    4????????boolean?ranAtLeastOne?=?false;
    5
    6????????do?{
    7????????????//?這里會從定時任務隊列中將達到執行事件的task丟到taskQueue中去
    8????????????fetchedAll?=?fetchFromScheduledTaskQueue();
    9????????????//?執行taskQueue中所有的task
    10????????????if?(runAllTasksFrom(taskQueue))?{
    11????????????????ranAtLeastOne?=?true;
    12????????????}
    13????????}?while?(!fetchedAll);?//?keep?on?processing?until?we?fetched?all?scheduled?tasks.
    14
    15????????if?(ranAtLeastOne)?{
    16????????????lastExecutionTime?=?ScheduledFutureTask.nanoTime();
    17????????}
    18????????//?這個是執行上面所說的tailTasks中的task
    19????????afterRunningAllTasks();
    20????????return?ranAtLeastOne;
    21????}
    復制代碼

    這一塊的邏輯是:
    先執行fetchFromScheduledTaskQueue方法,將到期的定時任務丟到taskQueue隊列中,這個fetchFromScheduledTaskQueue方法里面有個小細節,當taskQueue隊列滿了之后,它就會重新塞到scheduledTaskQueue隊列中,然后再外圈循環,taskQueue隊列消費完畢,則繼續執行fetchFromScheduledTaskQueue方法,直到把所有到期的任務都丟到taskQueue隊列中執行完畢為止。如下圖所示:

    netty_runTasks.png

    這一部分到這里就結束了,下一篇會對NioServerSocketChannel的注冊以及服務端創建NioSocketChannel進行分析。


    End

    總結

    以上是生活随笔為你收集整理的Netty系列(三):说说NioEventLoop的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。