Netty学习(三):Netty线程模型和代码示例
〇、前言
網絡編程的基本線程模型,詳見:Netty學習(二):線程模型
一、工作原理簡圖
Netty主要基于主從 Reactors 多線程模型(如下圖) 做了一定的改進,其中主從Reactor 多線程模型有多個Reactor。
BossGroup 線程維護Selector ,只關注Accecpt
當接收到Accept事件,獲取到對應的SocketChannel,封裝成NIOScoketChannel并注冊到Worker線程(事件循環),并進行維護
當Worker線程監聽到selector中通道發生自己感興趣的事件后,就由handle進行處理,注意handler已經加入到通道
二、工作原理詳圖
Netty抽象出兩組線程池BossGroup專門負責接收客戶端的連接,WorkerGroup專門負責網絡的讀寫
BossGroup和 WorkerGroup類型都是NioEventLoopGroup
NioEventLoopGroup 相當于一個事件循環組,這個組中含有多個事件循環,每一個事件循環是NioEventLoop
NioEventLoop表示一個不斷循環的執行處理任務的線程,每個NioEventLoop 都有一個selector ,用于監聽綁定在其上的socket 的網絡通訊
NioEventLoopGroup可以有多個線程,即可以含有多個NioEventLoop
每個 Boss NioEventLoop循環執行的步驟有3步
輪詢accept事件
處理accept事件,與client建立連接
生成 NioScocketChannel,并將其注冊到某個worker NIOEventLoop 上的 selector
處理任務隊列的任務,即runAllTasks
每個Worker NIOEventLoop 循環執行的步驟
輪詢read, write事件
處理i/o事件,即 read , write事件,在對應NioScocketChannel
處理處理任務隊列的任務,即runAllTasks
每個Worker NIOEventLoop處理業務時,會使用pipeline(管道),pipeline 中包含了channel,即通過pipeline可以獲取到對應通道,管道中維護了很多的處理器
三、Netty 快速入門實例-TCP服務
實例要求:使用IDEA創建Netty 項目
Netty 服務器在 6668端口監聽,客戶端能發送消息給服務器"hello,服務器~"
服務器可以回復消息給客戶端"hello,客戶端~"
目的:對Netty 線程模型有一個初步認識,便于理解Netty模型理論
3.1 創建Maven項目,并引入Netty 包
3.2 服務端
BossGroup 和 WorkerGroup:
public class NettyServer {public static void main(String[] args) throws Exception {//創建BossGroup 和 WorkerGroup//說明//1. 創建兩個線程組 bossGroup 和 workerGroup//2. bossGroup 只是處理連接請求 , 真正的和客戶端業務處理,會交給 workerGroup完成//3. 兩個都是無限循環//4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數// 默認實際 cpu核數 * 2EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); //8try {//創建服務器端的啟動對象,配置參數ServerBootstrap bootstrap = new ServerBootstrap();//使用鏈式編程來進行設置bootstrap.group(bossGroup, workerGroup) //設置兩個線程組.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作為服務器的通道實現.option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列得到連接個數.childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態 // .handler(null) // 該 handler對應 bossGroup , childHandler 對應 workerGroup.childHandler(new ChannelInitializer<SocketChannel>() {//創建一個通道初始化對象(匿名對象)//給pipeline 設置處理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("客戶socketchannel hashcode=" + ch.hashCode()); //可以使用一個集合管理 SocketChannel, 再推送消息時,可以將業務加入到各個channel 對應的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueuech.pipeline().addLast(new NettyServerHandler());}}); // 給我們的workerGroup 的 EventLoop 對應的管道設置處理器System.out.println(".....服務器 is ready...");//綁定一個端口并且同步, 生成了一個 ChannelFuture 對象//啟動服務器(并綁定端口)ChannelFuture cf = bootstrap.bind(6668).sync();//給cf 注冊監聽器,監控我們關心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("監聽端口 6668 成功");} else {System.out.println("監聽端口 6668 失敗");}}});//對關閉通道進行監聽cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }服務端handler:
/* 說明 1. 我們自定義一個Handler 需要繼續netty 規定好的某個HandlerAdapter(規范) 2. 這時我們自定義一個Handler , 才能稱為一個handler*/ public class NettyServerHandler extends ChannelInboundHandlerAdapter {//讀取數據實際(這里我們可以讀取客戶端發送的消息)/*1. ChannelHandlerContext ctx:上下文對象, 含有 管道pipeline , 通道channel, 地址2. Object msg: 就是客戶端發送的數據 默認Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服務器讀取線程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());System.out.println("server ctx =" + ctx);System.out.println("看看channel 和 pipeline的關系");Channel channel = ctx.channel();ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈接, 出站入站//將 msg 轉成一個 ByteBuf//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.ByteBuf buf = (ByteBuf) msg;System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));System.out.println("客戶端地址:" + channel.remoteAddress());}//數據讀取完畢@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//writeAndFlush 是 write + flush//將數據寫入到緩存,并刷新//一般講,我們對這個發送的數據進行編碼ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));}//處理異常, 一般是需要關閉通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }3.3 客戶端
public class NettyClient {public static void main(String[] args) throws Exception {//客戶端需要一個事件循環組EventLoopGroup group = new NioEventLoopGroup();try {//創建客戶端啟動對象//注意客戶端使用的不是 ServerBootstrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();//設置相關參數bootstrap.group(group) //設置線程組.channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器}});System.out.println("客戶端 ok..");//啟動客戶端去連接服務器端//關于 ChannelFuture 要分析,涉及到netty的異步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//給關閉通道進行監聽channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}} }客戶端handler:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {//當通道就緒就會觸發該方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client " + ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));}//當通道有讀取事件時,會觸發@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("服務器回復的消息:" + buf.toString(CharsetUtil.UTF_8));System.out.println("服務器的地址: "+ ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }總結
以上是生活随笔為你收集整理的Netty学习(三):Netty线程模型和代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Cholesterol-PEG3400-
- 下一篇: 手机App切图命名规则