【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )
文章目錄
- 一、 任務(wù)隊列 TaskQueue
- 二、 處理器 Handler 同步異步操作
- 三、 異步任務(wù) ( 用戶自定義任務(wù) )
- 四、 異步任務(wù) ( 用戶自定義定時任務(wù) )
- 五、 異步任務(wù) ( 其它線程向本線程調(diào)度任務(wù) )
一、 任務(wù)隊列 TaskQueue
任務(wù)隊列 TaskQueue 的任務(wù) Task 應(yīng)用場景 :
① 自定義任務(wù) : 自己開發(fā)的任務(wù) , 然后將該任務(wù)提交到任務(wù)隊列中 ;
② 自定義定時任務(wù) : 自己開發(fā)的任務(wù) , 然后將該任務(wù)提交到任務(wù)隊列中 , 同時可以指定任務(wù)的執(zhí)行時間 ;
③ 其它線程調(diào)度任務(wù) : 上面的任務(wù)都是在當(dāng)前的 NioEventLoop ( 反應(yīng)器 Reactor 線程 ) 中的任務(wù)隊列中排隊執(zhí)行 , 在其它線程中也可以調(diào)度本線程的 Channel 通道與該線程對應(yīng)的客戶端進行數(shù)據(jù)讀寫 ;
二、 處理器 Handler 同步異步操作
在之前的 Netty 服務(wù)器與客戶端項目中 , 用戶自定義的 Handler 處理器 , 該處理器繼承了 ChannelInboundHandlerAdapter 類 , 在重寫的 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 方法中 , 執(zhí)行的業(yè)務(wù)邏輯要注意以下兩點 :
- 同步操作 : 如果在該業(yè)務(wù)邏輯中只執(zhí)行一個短時間的操作 , 那么可以直接執(zhí)行 ;
- 異步操作 : 如果在該業(yè)務(wù)邏輯中執(zhí)行訪問數(shù)據(jù)庫 , 訪問網(wǎng)絡(luò) , 讀寫本地文件 , 執(zhí)行一系列復(fù)雜計算等耗時操作 , 肯定不能在該方法中處理 , 這樣會阻塞整個線程 ; 正確的做法是將耗時的操作放入任務(wù)隊列 TaskQueue , 異步執(zhí)行 ;
在 ChannelInboundHandlerAdapter 的 channelRead 方法執(zhí)行時 , 客戶端與服務(wù)器端的反應(yīng)器 Reactor 線程 NioEventLoop 是處于阻塞狀態(tài)的 , 此時服務(wù)器端與客戶端同時都處于阻塞狀態(tài) , 這樣肯定不行 , 因為 NioEventLoop 需要為多個客戶端服務(wù) , 不能因為與單一客戶端交互而產(chǎn)生阻塞 ;
三、 異步任務(wù) ( 用戶自定義任務(wù) )
1 . 用戶自定義任務(wù)流程 :
① 獲取通道 : 首先獲取 通道 Channel ;
② 獲取線程 : 獲取通道對應(yīng)的 EventLoop 線程 , 就是 NioEventLoop , 該 NioEventLoop 中封裝了任務(wù)隊列 TaskQueue ;
③ 任務(wù)入隊 : 向任務(wù)隊列 TaskQueue 中放入異步任務(wù) Runnable , 調(diào)用 NioEventLoop 線程的 execute 方法 , 即可將上述 Runnable 異步任務(wù)放入任務(wù)隊列 TaskQueue ;
2 . 多任務(wù)執(zhí)行 : 如果用戶連續(xù)向任務(wù)隊列中放入了多個任務(wù) , NioEventLoop 會按照順序先后執(zhí)行這些任務(wù) , 注意任務(wù)隊列中的任務(wù) 是先后執(zhí)行 , 不是同時執(zhí)行 ;
順序執(zhí)行任務(wù) ( 不是并發(fā) ) : 任務(wù)隊列任務(wù)執(zhí)行機制是順序執(zhí)行的 ; 先執(zhí)行第一個 , 執(zhí)行完畢后 , 從任務(wù)隊列中獲取第二個任務(wù) , 執(zhí)行完畢之后 , 依次從任務(wù)隊列中取出任務(wù)執(zhí)行 , 前一個任務(wù)執(zhí)行完畢后 , 才從任務(wù)隊列中取出下一個任務(wù)執(zhí)行 ;
3 . 代碼示例 : 監(jiān)聽到客戶端上傳數(shù)據(jù)后 , channelRead 回調(diào) , 執(zhí)行 獲取通道 -> 獲取線程 -> 異步任務(wù)調(diào)度 流程 ;
/*** Handler 處理者, 是 NioEventLoop 線程中處理業(yè)務(wù)邏輯的類** 繼承 : 該業(yè)務(wù)邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類* 才可以設(shè)置給 NioEventLoop 線程** 規(guī)范 : 該 Handler 類中需要按照業(yè)務(wù)邏輯處理規(guī)范進行開發(fā)*/ public class ServerHandr extends ChannelInboundHandlerAdapter {/*** 讀取數(shù)據(jù) : 在服務(wù)器端讀取客戶端發(fā)送的數(shù)據(jù)* @param ctx* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 客戶端地址信息* 管道 ( Pipeline ) : 注重業(yè)務(wù)邏輯處理 , 可以關(guān)聯(lián)很多 Handler* 通道 ( Channel ) : 注重數(shù)據(jù)讀寫* @param msg* 客戶端上傳的數(shù)據(jù)* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 1 . 從 ChannelHandlerContext ctx 中獲取通道Channel channel = ctx.channel();// 2 . 獲取通道對應(yīng)的事件循環(huán)EventLoop eventLoop = channel.eventLoop();// 3 . 在 Runnable 中用戶自定義耗時操作, 異步執(zhí)行該操作, 該操作不能阻塞在此處執(zhí)行eventLoop.execute(new Runnable() {@Overridepublic void run() {//執(zhí)行耗時操作}});} }四、 異步任務(wù) ( 用戶自定義定時任務(wù) )
1 . 用戶自定義定時任務(wù) 與 用戶自定義任務(wù)流程基本類似 , 有以下兩個不同之處 :
① 調(diào)度方法 :
- 定時異步任務(wù)使用 schedule 方法進行調(diào)度 ;
- 普通異步任務(wù)使用 execute 方法進行調(diào)度 ;
② 任務(wù)隊列 :
- 定時異步任務(wù)提交到 ScheduleTaskQueue 任務(wù)隊列中 ;
- 普通異步任務(wù)提交到 TaskQueue 任務(wù)隊列中 ;
2 . 用戶自定義定時任務(wù)流程 :
① 獲取通道 : 首先獲取 通道 Channel ;
② 獲取線程 : 獲取通道對應(yīng)的 EventLoop 線程 , 就是 NioEventLoop , 該 NioEventLoop 中封裝了任務(wù)隊列 TaskQueue ;
③ 任務(wù)入隊 : 向任務(wù)隊列 ScheduleTaskQueue 中放入異步任務(wù) Runnable , 調(diào)用 NioEventLoop 線程的 schedule 方法 , 即可將上述 Runnable 異步任務(wù)放入任務(wù)隊列 ScheduleTaskQueue ;
3 . 代碼示例 : 監(jiān)聽到客戶端上傳數(shù)據(jù)后 , channelRead 回調(diào) , 執(zhí)行 獲取通道 -> 獲取線程 -> 異步任務(wù)調(diào)度 流程 ;
/*** Handler 處理者, 是 NioEventLoop 線程中處理業(yè)務(wù)邏輯的類** 繼承 : 該業(yè)務(wù)邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類* 才可以設(shè)置給 NioEventLoop 線程** 規(guī)范 : 該 Handler 類中需要按照業(yè)務(wù)邏輯處理規(guī)范進行開發(fā)*/ public class ServerHandr extends ChannelInboundHandlerAdapter {/*** 讀取數(shù)據(jù) : 在服務(wù)器端讀取客戶端發(fā)送的數(shù)據(jù)* @param ctx* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 客戶端地址信息* 管道 ( Pipeline ) : 注重業(yè)務(wù)邏輯處理 , 可以關(guān)聯(lián)很多 Handler* 通道 ( Channel ) : 注重數(shù)據(jù)讀寫* @param msg* 客戶端上傳的數(shù)據(jù)* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 1 . 從 ChannelHandlerContext ctx 中獲取通道Channel channel = ctx.channel();// 2 . 獲取通道對應(yīng)的事件循環(huán)EventLoop eventLoop = channel.eventLoop();// 3 . 在 Runnable 中用戶自定義耗時操作, 異步執(zhí)行該操作, 該操作不能阻塞在此處執(zhí)行// schedule(Runnable command, long delay, TimeUnit unit)// Runnable command 參數(shù) : 異步任務(wù)// long delay 參數(shù) : 延遲執(zhí)行時間// TimeUnit unit參數(shù) : 延遲時間單位, 秒, 毫秒, 分鐘eventLoop.schedule(new Runnable() {@Overridepublic void run() {//執(zhí)行耗時操作}}, 100, TimeUnit.MILLISECONDS);} }五、 異步任務(wù) ( 其它線程向本線程調(diào)度任務(wù) )
1 . 獲取通道 Channel 即可調(diào)度異步任務(wù) : 由上面的任務(wù)調(diào)度流程可知 , 只要獲取到了本 NioEventLoop 線程對應(yīng)的 Channel 通道 , 就可以獲取該 NioEventLoop 線程的 EventLoop 事件調(diào)度器 , 向 ScheduleTaskQueue 或 TaskQueue 任務(wù)隊列中加入異步任務(wù) ;
2 . Channel 通道獲取與管理 :
① Channel 通道獲取 : 在服務(wù)器啟動設(shè)置 ServerBootstrap 中 , 會設(shè)置 ChannelInitializer , 在與客戶端的連接建立成功后 , 會回調(diào) initChannel 方法 , 此時就會得到該客戶端連接對應(yīng)的通道 SocketChannel ;
② Channel 通道管理 : 在服務(wù)器中使用 Map 集合管理該 Channel 通道 , 需要時根據(jù)用戶標(biāo)識信息 , 獲取該通道 , 向該客戶端通道對應(yīng)的 NioEventLoop 線程中調(diào)度任務(wù) ;
3 . 代碼示例 : 這里只展示一下 ChannelInitializer 的回調(diào)位置 , 不再詳細描述怎么維護集合的過程了 , 自己定義 Map 集合維護 ;
// 服務(wù)器啟動對象, 需要為該對象配置各種參數(shù) ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // 設(shè)置 主從 線程組 , 分別對應(yīng) 主 Reactor 和 從 Reactor.channel(NioServerSocketChannel.class) // 設(shè)置 NIO 網(wǎng)絡(luò)套接字通道類型.option(ChannelOption.SO_BACKLOG, 128) // 設(shè)置線程隊列維護的連接個數(shù).childOption(ChannelOption.SO_KEEPALIVE, true) // 設(shè)置連接狀態(tài)行為, 保持連接狀態(tài).childHandler( // 為 WorkerGroup 線程池對應(yīng)的 NioEventLoop 設(shè)置對應(yīng)的事件 處理器 Handlernew ChannelInitializer<SocketChannel>() {// 創(chuàng)建通道初始化對象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 該方法在服務(wù)器與客戶端連接建立成功后會回調(diào)// 為 管道 Pipeline 設(shè)置處理器 Hanedlerch.pipeline().addLast(new ServerHandr());}});總結(jié)
以上是生活随笔為你收集整理的【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Netty】Netty 入门案例分析
- 下一篇: 【Netty】Netty 异步任务模型