Netty学习笔记(四)EventLoopGroup续篇
前面講到Reactor的核心是執行了NioEventLoop的run方法,主要做了上面三件事:
- 輪詢注冊到reactor線程上的對應的selector的所有channel的IO事件
- 根據不同的SelectKeys進行處理??processSelectedKeys();
- 處理任務隊列 runAllTasks(); ??
接下來再詳細看下processSelectedKeys()和runAllTasks(); ?方法做了什么
processSelectedKeys
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}這里的processSelectedkeys()方法會根據selectedKeys是否為空,判斷執行優化后的processSelectedKeysOptimized()還是普通的processSelectedKeysPlain()方法
這里的selectedKeys Netty在調用openSelector時對其進行了優化
private SelectedSelectionKeySet selectedKeys;private Selector openSelector() {final Selector selector;selector = provider.openSelector();final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {@Overridepublic Object run() { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");selectedKeysField.setAccessible(true);publicSelectedKeysField.setAccessible(true);selectedKeysField.set(selector, selectedKeySet);publicSelectedKeysField.set(selector, selectedKeySet);return null; }});selectedKeys = selectedKeySet;return selector;}先創建一個空的SelectedSelectionKeySet對象,然后通過反射獲取jdk?底層的Selector 的class 對象的 selectedKeys和publicSelectedKeys字段,并將Netty的SelectedSelectionKeySet通過反射賦值,這樣在底層調用jdk的api存儲注冊事件時,最后都會把事件保存到Netty的SelectedSelectionKeySet 對象里
可以看下替換前后有什么區別,jdk底層的SelectImpl對象的selectedKeys和publicSelectedKeys字段都是Set<SelectionKey>類型,而Netty里的SelectedSelectionKeySet對象是這樣的一個結構:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {private SelectionKey[] keysA;private int keysASize;private SelectionKey[] keysB;private int keysBSize;private boolean isA = true;@Overridepublic boolean add(SelectionKey o) {if (o == null) {return false;}//添加元素到數組的最后,如果數組滿了,就進行擴容(*2)if (isA) {int size = keysASize;keysA[size ++] = o;keysASize = size;if (size == keysA.length) {doubleCapacityA();}} else {...}return true;}//移除對應的SelectionKey數組的最后一個元素SelectionKey[] flip() {if (isA) {isA = false;keysA[keysASize] = null;keysBSize = 0;return keysA;} else {...}}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<SelectionKey> iterator() {throw new UnsupportedOperationException();} }SelectedSelectionKeySet是AbstractSet的一個子類,底層通過SelectionKey[]數組方法實現,并且將一些不需要的方法remove,contains方法進行重寫,Netty里輪詢事件的時候對操作進行了簡化,不需要通過集合的Iterator進行移除,而直接通過flip方法去掉集合的最后一個SelectionKey就可以了(這樣的操作的時間復雜度更低,可以直接定位到具體的下標),而我們在使用NIO的API的時候都需要進行remove操作
4.1.6.Final中的源碼,這里的SelectionKey是兩個數組交替遍歷的,在4.1.9.Final 版本中,netty已經將SelectedSelectionKeySet底層使用一個數組了:SelectedSelectionKeySet
接著來看下?processSelectedKeysOptimized(selectedKeys.flip());方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i = 0;; i ++) {final SelectionKey k = selectedKeys[i];if (k == null) {break;}selectedKeys[i] = null;//拿到SelectionKey的attachment,并根據其類型做不同處理final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {//如果需要重新select,就將selectedKeys的元素都置為null恢復初始的狀態for (;;) {i++;if (selectedKeys[i] == null) {break;}selectedKeys[i] = null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again. selectedKeys = this.selectedKeys.flip();i = -1;}}}上述過程可以分為三步:
- 取出SelectionKey(包含channel,attachment等信息)
-
這里看到SelectionKey的attachment類型可能是AbstractNioChannel,猜測是不是在注冊事件的時間添加的,根據ServerBootstrap的啟動流程,最后會調用AbstractNioChannel的如下方法:
selectionKey = javaChannel().register(eventLoop().selector, 0, this);這里的最后一個參數也就是attachment,當前對象不就是AbstractNioChannel的子類
- 處理SelectionKey
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;eventLoop = ch.eventLoop();if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise());return;}int readyOps = k.readyOps();//連接建立事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blockingint ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;//1.將連接事件從interestOps中移除k.interestOps(ops);//2.調用pipeline().fireChannelActive()將連接建立完成通知給pipeline中的各個handler unsafe.finishConnect();}//可寫事件if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();} //可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}}
可以看出這里就是一系列NIO的操作,分別對OP_READ, 可讀事件,?OP_WRITE, 可寫事件,?OP_CONNECT, 連接事件進行處理
以OP_READ事件為例
public final void read() {final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;do {//1.分配ByteBufbyteBuf = allocHandle.allocate(allocator);//2.從Channel讀取數據allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;break;}allocHandle.incMessagesRead(1);readPending = false;//3.通過pipeline.fireChannelRead事件通知給pipeline里的各個handlerpipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}}}- 判斷是否需要重新Select并重置
這里的cancelledKeys會在調用cancel(SelectionKey)刪除注冊事件的時候計數,當他大于CLEANUP_INTERVAL(256)的時候,就會將needsToSelectAgain設置為true,進入對應的分支判斷,先將原來的selectedKeys都置為Null,然后重新調用selectNow(),重新填充selectedKeys
總結:
netty的NioEventLoop線程第二步做的事情就是處理SelectionKey,netty使用數組替換掉jdk原生的HashSet來優化查詢和更新SelectionKey的效率,每個SelectionKey上綁定了netty類AbstractNioChanne的具體實現子類對象作為attachment,在處理每個SelectionKey的時候,就可以找到對應的AbstractNioChannel,最后通過pipeline來處理通知給其他Handler
任務執行runAllTasks
任務添加
添加普通任務
前面的分析說過NioEventLoop 是Netty的核心線程,其添加任務是通過執行父類SingleThreadEventExecutor的execute方法,
通過addTask方法,將Runnable(即task)添加到對應的任務隊列?Queue<Runnable> taskQueue;里
Netty的源碼里的bind()流程中有通過如下方法添加對應的task到SingleThreadEventExecutor的任務隊列里,如下:
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}用戶也可以通過如下方式自己添加task到TaskQueue
EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() {@Overridepublic void run() {//TODO } });添加定時任務
除了上述方式,我們還可以通過如下方法添加定時任務到對應的任務隊列
EventLoop eventLoop = channel.eventLoop(); eventLoop.schedule(new Runnable() {@Overridepublic void run() {//TODO } }, 30, TimeUnit.SECONDS);具體的實現是在父類AbstractScheduledEventExecutor里,看下對應的源碼
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {...if (delay < 0) {throw new IllegalArgumentException(String.format("delay: %d (expected: >= 0)", delay));}return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));}<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {if (inEventLoop()) {scheduledTaskQueue().add(task);} else {execute(new Runnable() {@Overridepublic void run() {scheduledTaskQueue().add(task);}});}return task;}會將對應的Runnable和延遲時間封裝成一個新的ScheduledFutureTask,然后調用重載的schedule方法,將對應的task添加到PriorityQueue<ScheduledFutureTask<?>>的優先隊列里
這里對添加定時任務的Thread進行了判斷,如果調用的發起方是reactor線程,那么就直接將Task添加到優先隊列中;如果是外部線程調用的schedule,會將"添加定時任務到優先隊列"封裝成一個Runnable也就是新的task,然后調用上面的execute方法去添加任務,這樣會訪問PriorityQueue的就只有reactor線程了,變成了單線程
接下來我們來詳細看下這個特殊的優先隊列PriorityQueue<ScheduledFutureTask<?>>,所謂的優先隊列與普通隊列的區別在于每個元素都被賦予了優先級。當訪問元素時,會將具有最高優先級的元素最先彈出。即優先隊列具有最高級先出的特征。
看下這個優先隊列里的元素ScheduledFutureTask,它實現了Comparable接口,定義了自己的compareTo方法,先比較deadlineNanos(也就是截止時間)的大小,如果一樣則比較id,如果也相同就拋出異常
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {private static final AtomicLong nextTaskId = new AtomicLong();private final long id = nextTaskId.getAndIncrement();@Overridepublic int compareTo(Delayed o) {if (this == o) {return 0;}ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;long d = deadlineNanos() - that.deadlineNanos();if (d < 0) {return -1;} else if (d > 0) {return 1;} else if (id < that.id) {return -1;} else if (id == that.id) {throw new Error();} else {return 1;}} }既然ScheduledFutureTask本質也是一個Runnable,那么就看下它的run方法吧
這里對于不同的類型任務進行了不同的處理,periodNanos=0表示是只執行一次的任務,>0 表示是按照指定頻率定期執行的任務,<0表示是每次執行完成后,延遲一段時間再次執行的任務(二者的區別在于一個是根據上次任務開始執行的時間計算間隔,一個是按照上次任務執行結束的時間計算間隔)
Task任務的執行
有兩個重載的runAllTasks方法,一個無參,一個帶有long timeoutNanos參數,先來看下無參的方法
protected boolean runAllTasks() {assert inEventLoop();boolean fetchedAll;boolean ranAtLeastOne = false;do {fetchedAll = fetchFromScheduledTaskQueue();if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();}afterRunningAllTasks();return ranAtLeastOne;}主要做下面三件事情:
1.將優先隊列里的ScheduledFutureTask取出放到taskQueue里
2.從taskQueue里取出task并執行
3.task任務執行完畢后執行后置處理邏輯
將任務從優先隊列移動到taskQueue
private boolean fetchFromScheduledTaskQueue() {long nanoTime = AbstractScheduledEventExecutor.nanoTime();Runnable scheduledTask = pollScheduledTask(nanoTime);while (scheduledTask != null) {if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);return false;}scheduledTask = pollScheduledTask(nanoTime);}return true;}protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();if (scheduledTask == null) {return null;}if (scheduledTask.deadlineNanos() <= nanoTime) {scheduledTaskQueue.remove();return scheduledTask;}return null;}先從scheduledTaskQueue優先隊列里拿到對應優先級最高的task(截止時間最近的Task),判斷當前是否已到達其截止時間,是的話就將其從優先隊列中取出并刪除元素,然后將其加入到taskQueue中,如果加入失敗就重新加入到scheduledTaskQueue中,一直到所有的優先隊列里的task都遷移成功
簡單來說就是把已經到期的定時任務從PriorityQueue轉移到taskQueue
執行task
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);if (task == null) {return false;}for (;;) {safeExecute(task);task = pollTaskFrom(taskQueue);if (task == null) {return true;}}}protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();if (task == WAKEUP_TASK) {continue;}return task;}}protected static void safeExecute(Runnable task) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);}}從taskQueue中取出非WAKEUP_TASK的任務,然后調用safeExecute() --內部之間調用task.run()來安全執行所有的task,一直到所有的task都執行完畢
后置處理
@Overrideprotected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);}當所有的task執行完畢之后,我們還可以執行一些自己的task,通過afterRunningAllTasks方法來執行在tailTasks隊列里的所有任務,我們可以通過SingleThreadEventLoop的executeAfterEventLoopIteration向tailTasks里添加自己想要執行的業務邏輯
task的執行還有一個帶有超時時間的重載方法,如下:
protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();//從taskQueue poll獲取任務Runnable task = pollTask();if (task == null) {afterRunningAllTasks();return false;}//計算當前方法超時的截止時間final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;long runTasks = 0;long lastExecutionTime;for (;;) {safeExecute(task);runTasks ++;//位運算,說明runTasks是64的倍數 0x3F=0011 1111 (63)if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();if (lastExecutionTime >= deadline) {break;}}task = pollTask();if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();break;}}afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;}基本思路和不帶參數的runAllTasks一樣,區別在于會考慮所有任務執行的超時時間,為了提高執行效率,每執行64個任務都會比較下當前時間是否大于runAllTasks的截止時間,是的話就退出
從上面可以看出,我們的EventLoopGroup?既需要執行 IO 操作, 又需要執行 很多的task, 因此在調用對應execute 方法添加任務的時候, 不要提交耗時任務, 更不能提交一些會造成阻塞的任務, 不然會導致我們的 IO 線程得不到調度, 影響整個程序的并發量
總結一下:
- netty內的任務可分為普通任務和定時任務,分別保存在LinkedBlockingQueue和PriorityQueue
- netty執行任務之前,會將已經到期的定時任務從PriorityQueue轉移到LinkedBlockingQueue
- 如果執行任務有超時時間,那么會每執行64個任務校驗下是否達到截止時間
參考:
netty源碼分析之揭開reactor線程的面紗(二)
netty源碼分析之揭開reactor線程的面紗(三)
Netty 源碼分析-EventLoop???????
總結
以上是生活随笔為你收集整理的Netty学习笔记(四)EventLoopGroup续篇的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 14 发布了,终于可以扔掉Lo
- 下一篇: Netty学习笔记(五)Pipeline