日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty学习笔记(四)EventLoopGroup续篇

發布時間:2024/4/11 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty学习笔记(四)EventLoopGroup续篇 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
@Overrideprotected void run() {for (;;) {try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT://select輪詢, 設置wakenUp為false并返回之前的wakenUp值select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}//去除了無關緊要的代碼processSelectedKeys();runAllTasks(); } catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception....}}

前面講到Reactor的核心是執行了NioEventLoop的run方法,主要做了上面三件事:

  • 輪詢注冊到reactor線程上的對應的selector的所有channel的IO事件
  • 根據不同的SelectKeys進行處理??processSelectedKeys();
  • 處理任務隊列 runAllTasks(); ??

接下來再詳細看下processSelectedKeys()和runAllTasks(); ?方法做了什么

processSelectedKeys

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}

這里的processSelectedkeys()方法會根據selectedKeys是否為空,判斷執行優化后的processSelectedKeysOptimized()還是普通的processSelectedKeysPlain()方法

這里的selectedKeys Netty在調用openSelector時對其進行了優化

private SelectedSelectionKeySet selectedKeys;private Selector openSelector() {final Selector selector;selector = provider.openSelector();final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");selectedKeysField.setAccessible(true);publicSelectedKeysField.setAccessible(true);selectedKeysField.set(selector, selectedKeySet);publicSelectedKeysField.set(selector, selectedKeySet);return null; }});selectedKeys = selectedKeySet;return selector;}

先創建一個空的SelectedSelectionKeySet對象,然后通過反射獲取jdk?底層的Selector 的class 對象的 selectedKeys和publicSelectedKeys字段,并將Netty的SelectedSelectionKeySet通過反射賦值,這樣在底層調用jdk的api存儲注冊事件時,最后都會把事件保存到Netty的SelectedSelectionKeySet 對象里

可以看下替換前后有什么區別,jdk底層的SelectImpl對象的selectedKeys和publicSelectedKeys字段都是Set<SelectionKey>類型,而Netty里的SelectedSelectionKeySet對象是這樣的一個結構:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {private SelectionKey[] keysA;private int keysASize;private SelectionKey[] keysB;private int keysBSize;private boolean isA = true;@Overridepublic boolean add(SelectionKey o) {if (o == null) {return false;}//添加元素到數組的最后,如果數組滿了,就進行擴容(*2)if (isA) {int size = keysASize;keysA[size ++] = o;keysASize = size;if (size == keysA.length) {doubleCapacityA();}} else {...}return true;}//移除對應的SelectionKey數組的最后一個元素SelectionKey[] flip() {if (isA) {isA = false;keysA[keysASize] = null;keysBSize = 0;return keysA;} else {...}}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<SelectionKey> iterator() {throw new UnsupportedOperationException();} }

SelectedSelectionKeySet是AbstractSet的一個子類,底層通過SelectionKey[]數組方法實現,并且將一些不需要的方法remove,contains方法進行重寫,Netty里輪詢事件的時候對操作進行了簡化,不需要通過集合的Iterator進行移除,而直接通過flip方法去掉集合的最后一個SelectionKey就可以了(這樣的操作的時間復雜度更低,可以直接定位到具體的下標),而我們在使用NIO的API的時候都需要進行remove操作
4.1.6.Final中的源碼,這里的SelectionKey是兩個數組交替遍歷的,在4.1.9.Final 版本中,netty已經將SelectedSelectionKeySet底層使用一個數組了:SelectedSelectionKeySet

接著來看下?processSelectedKeysOptimized(selectedKeys.flip());方法

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//拿到SelectionKey的attachment,并根據其類型做不同處理final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {//如果需要重新select,就將selectedKeys的元素都置為null恢復初始的狀態for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again. selectedKeys = this.selectedKeys.flip();i = -1;}}}

上述過程可以分為三步:

  • 取出SelectionKey(包含channel,attachment等信息)
  • 這里看到SelectionKey的attachment類型可能是AbstractNioChannel,猜測是不是在注冊事件的時間添加的,根據ServerBootstrap的啟動流程,最后會調用AbstractNioChannel的如下方法:

    selectionKey = javaChannel().register(eventLoop().selector, 0, this);

    這里的最后一個參數也就是attachment,當前對象不就是AbstractNioChannel的子類

  • 處理SelectionKey
  • private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;eventLoop = ch.eventLoop();if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise());return;}int readyOps = k.readyOps();//連接建立事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;//1.將連接事件從interestOps中移除k.interestOps(ops);//2.調用pipeline().fireChannelActive()將連接建立完成通知給pipeline中的各個handler unsafe.finishConnect();}//可寫事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();} //可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}}

可以看出這里就是一系列NIO的操作,分別對OP_READ, 可讀事件,?OP_WRITE, 可寫事件,?OP_CONNECT, 連接事件進行處理

以OP_READ事件為例

public final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;do {//1.分配ByteBufbyteBuf = allocHandle.allocate(allocator);//2.從Channel讀取數據allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;break;}allocHandle.incMessagesRead(1);readPending = false;//3.通過pipeline.fireChannelRead事件通知給pipeline里的各個handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}}}
  • 判斷是否需要重新Select并重置
void cancel(SelectionKey key) {key.cancel();cancelledKeys ++;if (cancelledKeys >= CLEANUP_INTERVAL) {cancelledKeys = 0;needsToSelectAgain = true;}}

這里的cancelledKeys會在調用cancel(SelectionKey)刪除注冊事件的時候計數,當他大于CLEANUP_INTERVAL(256)的時候,就會將needsToSelectAgain設置為true,進入對應的分支判斷,先將原來的selectedKeys都置為Null,然后重新調用selectNow(),重新填充selectedKeys

總結:
netty的NioEventLoop線程第二步做的事情就是處理SelectionKey,netty使用數組替換掉jdk原生的HashSet來優化查詢和更新SelectionKey的效率,每個SelectionKey上綁定了netty類AbstractNioChanne的具體實現子類對象作為attachment,在處理每個SelectionKey的時候,就可以找到對應的AbstractNioChannel,最后通過pipeline來處理通知給其他Handler

任務執行runAllTasks

任務添加

添加普通任務

前面的分析說過NioEventLoop 是Netty的核心線程,其添加任務是通過執行父類SingleThreadEventExecutor的execute方法,
通過addTask方法,將Runnable(即task)添加到對應的任務隊列?Queue<Runnable> taskQueue;里

public void execute(Runnable task) {boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task); } }

Netty的源碼里的bind()流程中有通過如下方法添加對應的task到SingleThreadEventExecutor的任務隊列里,如下:

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

用戶也可以通過如下方式自己添加task到TaskQueue

EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() {@Overridepublic void run() {//TODO } });

添加定時任務

除了上述方式,我們還可以通過如下方法添加定時任務到對應的任務隊列

EventLoop eventLoop = channel.eventLoop(); eventLoop.schedule(new Runnable() {@Overridepublic void run() {//TODO } }, 30, TimeUnit.SECONDS);

具體的實現是在父類AbstractScheduledEventExecutor里,看下對應的源碼

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {...if (delay < 0) {throw new IllegalArgumentException(String.format("delay: %d (expected: >= 0)", delay));}return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task);}});}return task;}

會將對應的Runnable和延遲時間封裝成一個新的ScheduledFutureTask,然后調用重載的schedule方法,將對應的task添加到PriorityQueue<ScheduledFutureTask<?>>的優先隊列里

這里對添加定時任務的Thread進行了判斷,如果調用的發起方是reactor線程,那么就直接將Task添加到優先隊列中;如果是外部線程調用的schedule,會將"添加定時任務到優先隊列"封裝成一個Runnable也就是新的task,然后調用上面的execute方法去添加任務,這樣會訪問PriorityQueue的就只有reactor線程了,變成了單線程

接下來我們來詳細看下這個特殊的優先隊列PriorityQueue<ScheduledFutureTask<?>>,所謂的優先隊列與普通隊列的區別在于每個元素都被賦予了優先級。當訪問元素時,會將具有最高優先級的元素最先彈出。即優先隊列具有最高級先出的特征

看下這個優先隊列里的元素ScheduledFutureTask,它實現了Comparable接口,定義了自己的compareTo方法,先比較deadlineNanos(也就是截止時間)的大小,如果一樣則比較id,如果也相同就拋出異常

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {private static final AtomicLong nextTaskId = new AtomicLong();private final long id = nextTaskId.getAndIncrement();@Overridepublic int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;long d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else if (id == that.id) {throw new Error();} else {return 1;}} }

既然ScheduledFutureTask本質也是一個Runnable,那么就看下它的run方法吧
這里對于不同的類型任務進行了不同的處理,periodNanos=0表示是只執行一次的任務,>0 表示是按照指定頻率定期執行的任務,<0表示是每次執行完成后,延遲一段時間再次執行的任務(二者的區別在于一個是根據上次任務開始執行的時間計算間隔,一個是按照上次任務執行結束的時間計算間隔)

/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */private final long periodNanos; @Overridepublic void run() {if (periodNanos == 0) {if (setUncancellableInternal()) {V result = task.call();setSuccessInternal(result);}} else {if (!isCancelled()) {task.call();if (!executor().isShutdown()) {long p = periodNanos;if (p > 0) {//設置該任務的下一次截止時間為本次的截止時間加上間隔時間periodNanosdeadlineNanos += p;} else {//設置下一次截止時間為當前時間加上延遲(因為p<0,所以要減去) 此時的當前時間就是本次任務之間結束的時間 task.call()是一個阻塞的方法deadlineNanos = nanoTime() - p;}if (!isCancelled()) {//將新的ScheduledFutureTask添加到任務隊列等待下次執行Queue<ScheduledFutureTask<?>> scheduledTaskQueue =((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;assert scheduledTaskQueue != null;scheduledTaskQueue.add(this);}}}}}

Task任務的執行

有兩個重載的runAllTasks方法,一個無參,一個帶有long timeoutNanos參數,先來看下無參的方法

protected boolean runAllTasks() {assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne = false;do {fetchedAll = fetchFromScheduledTaskQueue();if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();}afterRunningAllTasks();return ranAtLeastOne;}

主要做下面三件事情:

1.將優先隊列里的ScheduledFutureTask取出放到taskQueue里
2.從taskQueue里取出task并執行
3.task任務執行完畢后執行后置處理邏輯

將任務從優先隊列移動到taskQueue

private boolean fetchFromScheduledTaskQueue() {long nanoTime = AbstractScheduledEventExecutor.nanoTime();Runnable scheduledTask = pollScheduledTask(nanoTime);while (scheduledTask != null) {if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);return false;}scheduledTask = pollScheduledTask(nanoTime);}return true;}protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();if (scheduledTask == null) {return null;}if (scheduledTask.deadlineNanos() <= nanoTime) {scheduledTaskQueue.remove();return scheduledTask;}return null;}

先從scheduledTaskQueue優先隊列里拿到對應優先級最高的task(截止時間最近的Task),判斷當前是否已到達其截止時間,是的話就將其從優先隊列中取出并刪除元素,然后將其加入到taskQueue中,如果加入失敗就重新加入到scheduledTaskQueue中,一直到所有的優先隊列里的task都遷移成功

簡單來說就是把已經到期的定時任務從PriorityQueue轉移到taskQueue

執行task

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {safeExecute(task);task = pollTaskFrom(taskQueue);if (task == null) {return true;}}}protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}}protected static void safeExecute(Runnable task) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);}}

從taskQueue中取出非WAKEUP_TASK的任務,然后調用safeExecute() --內部之間調用task.run()來安全執行所有的task,一直到所有的task都執行完畢

后置處理

@Overrideprotected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);}

當所有的task執行完畢之后,我們還可以執行一些自己的task,通過afterRunningAllTasks方法來執行在tailTasks隊列里的所有任務,我們可以通過SingleThreadEventLoop的executeAfterEventLoopIteration向tailTasks里添加自己想要執行的業務邏輯

task的執行還有一個帶有超時時間的重載方法,如下:

protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();//從taskQueue poll獲取任務Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}//計算當前方法超時的截止時間final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task);runTasks ++;//位運算,說明runTasks是64的倍數 0x3F=0011 1111 (63)if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}

基本思路和不帶參數的runAllTasks一樣,區別在于會考慮所有任務執行的超時時間,為了提高執行效率,每執行64個任務都會比較下當前時間是否大于runAllTasks的截止時間,是的話就退出

從上面可以看出,我們的EventLoopGroup?既需要執行 IO 操作, 又需要執行 很多的task, 因此在調用對應execute 方法添加任務的時候, 不要提交耗時任務, 更不能提交一些會造成阻塞的任務, 不然會導致我們的 IO 線程得不到調度, 影響整個程序的并發量

總結一下:

  • netty內的任務可分為普通任務和定時任務,分別保存在LinkedBlockingQueue和PriorityQueue
  • netty執行任務之前,會將已經到期的定時任務從PriorityQueue轉移到LinkedBlockingQueue
  • 如果執行任務有超時時間,那么會每執行64個任務校驗下是否達到截止時間

參考:
netty源碼分析之揭開reactor線程的面紗(二)
netty源碼分析之揭開reactor線程的面紗(三)
Netty 源碼分析-EventLoop???????

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的Netty学习笔记(四)EventLoopGroup续篇的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。