基于Netty实现群聊功能
有你的日子你就是一切,沒你的日子一切都是你
又是一天新的開始,讓我來帶領(lǐng)你打開任督二脈。。。
前言:
在前面有了對NIO、BIO知識的學習,以及對netty結(jié)構(gòu)組的基本了解,接下來將學習一下如何使用netty去實現(xiàn)一個群聊功能,讀者可自行去對比基于NIO、BIO、Netty實現(xiàn)群聊功能的不同方式,以更深刻的理解IO網(wǎng)絡(luò)編程。
學習內(nèi)容:
整體思路:
1) 編寫一個Netty群聊系統(tǒng),實現(xiàn)服務(wù)器端和客戶端之間的數(shù)據(jù)簡單通訊(非阻塞)
2) 實現(xiàn)多人聊天
3) 服務(wù)器端:可以監(jiān)測用戶上線,離線,并實現(xiàn)消息轉(zhuǎn)發(fā)
4) 客戶端: 通過channel可以無阻塞發(fā)送消息給其他所有用戶,同時可接收到其他用戶發(fā)送的消息)
5)目的:進一步理解Netty非阻塞網(wǎng)絡(luò)編程機制
具體實現(xiàn)
服務(wù)端代碼
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Scanner;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:05 * @description: 群聊服務(wù)端 * @Version: 1.0 */ public class NettyChartServer {/*** 服務(wù)端口號*/private int port;public NettyChartServer(int port) {this.port = port;}/*** 服務(wù)執(zhí)行:用于處理客戶端請求*/public void run() {//服務(wù)線程組EventLoopGroup bossgroup = new NioEventLoopGroup(1);//工作組線程池,默認為CPU核數(shù)*2EventLoopGroup workgroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossgroup, workgroup).channel(NioServerSocketChannel.class) //可以監(jiān)聽新進來的TCP連接的通道.option(ChannelOption.SO_BACKLOG, 128) // 設(shè)置線程隊列得到連接個數(shù).childOption(ChannelOption.SO_KEEPALIVE, true) // 設(shè)置保持活動連接狀態(tài).handler(new LoggingHandler(LogLevel.INFO))//心跳檢測日志.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//獲取到piepline管道ChannelPipeline pipeline = socketChannel.pipeline();//向pipeline加入解碼器pipeline.addLast("decoder", new StringDecoder());//向pipeline加入編碼器pipeline.addLast("encoder", new StringEncoder());//心跳檢測handlepipeline.addLast("idcheck",new IdleStateHandler(3,5,8, TimeUnit.SECONDS));//加入自己的業(yè)務(wù)處理handlerpipeline.addLast("handle",new MyNettyServerHandler());}});ChannelFuture ch = bootstrap.bind(port).sync();System.out.println("服務(wù)器 is ready-------");//異步監(jiān)聽端口ch.addListeners(e->{if(e.isSuccess()){System.out.println("監(jiān)聽端口 "+port+" 成功");}else {System.out.println("監(jiān)聽端口 "+port+" 失敗");}});//異步關(guān)閉通道ch.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//優(yōu)雅退出線程池bossgroup.shutdownGracefully();workgroup.shutdownGracefully();}}public static void main(String[] args) {NettyChartServer server=new NettyChartServer(8888);server.run();} }服務(wù)端主要作用是對8888端口進行監(jiān)聽,等待客戶端的請求。使用了主從Reactor模式,去實現(xiàn)接收處理多個客戶端請求。利用自定義業(yè)務(wù)處理handler 完成對信息的獲取,以及客戶端直接信息轉(zhuǎn)發(fā)。
服務(wù)端業(yè)務(wù)處理器
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.HashMap;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:43 * @description: 業(yè)務(wù)自定義的邏輯處理類:它是入站 ChannelInboundHandler 類型的處理器,負責接收解碼后的 HTTP 請求數(shù)據(jù),并將請求處理結(jié)果寫回客戶端。 * @Version: 1.0 */ public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {//定義一個channerl集合,類似于List<Channel> ,用于存儲不同的channel通道轉(zhuǎn)發(fā)信息private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);//設(shè)計一個集合可存儲通道通道對應(yīng)的用戶,未開發(fā)改功能private static HashMap<String,Channel> channelHashMap=new HashMap<>();private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 該channel建立連接后: 生命周期 handlerAdded》 channelRegistered-》channelActive,* 表示連接建立,一旦連接,第一個執(zhí)行* @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("------------handlerAdded------");Channel channel = ctx.channel();channelGroup.add(channel);channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"加入聊天");}/*** 連接中斷* @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("------------handlerRemoved------");Channel channel=ctx.channel();//服務(wù)下線,移除該channelchannelGroup.remove(channel);//并通知,已離線channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"離開了~");}/*** 表示該channel處于活動狀態(tài)* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelActive------");System.out.println(ctx.channel().remoteAddress() + " 上線了~");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelInactive------");System.out.println(ctx.channel().remoteAddress() + " 下線了~");}/*** 讀取消息* @param channelHandlerContext* @param s* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println("------------channelRead0------");Channel channel = channelHandlerContext.channel();//將消息轉(zhuǎn)發(fā)給其他客戶端,并且排除自己channelGroup.forEach(e->{if (channel!=e){//e.writeAndFlush("【客戶端】"+channel.remoteAddress()+"說:"+msg);}else {e.writeAndFlush("【自己】"+"說:"+msg);}});}/*** 消息讀取后* @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelReadComplete------");}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelRegistered------");}/*** 離線生命:* @param ctx exceptionCaught-》channelInactive》channelUnregistered》handlerRemoved* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelUnregistered------");}/*** 處理心跳檢測* @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// System.out.println("------------userEventTriggered------");if (evt instanceof IdleStateEvent){//向下轉(zhuǎn)型IdleStateEvent event=(IdleStateEvent)evt;String eventType=null;switch (event.state()){case READER_IDLE:eventType="讀空閑";break;case WRITER_IDLE:eventType="寫空閑";break;case ALL_IDLE:eventType="讀寫空閑";break;default:break;}System.out.println(ctx.channel().remoteAddress()+"---超時時間---"+eventType);System.out.println("服務(wù)器做相應(yīng)操作");}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("------------exceptionCaught------");} }業(yè)務(wù)自定義的邏輯處理類:它是入站 ChannelInboundHandler 類型的處理器,負責接收解碼后的 HTTP 請求數(shù)據(jù),并將請求處理結(jié)果寫回客戶端。當channle通道建立以后,便開啟了生命周期。其中 channelRegistered 是用于channnel通道注冊,channelActive用于查看通道是否活躍 、channelRead0用于讀取客戶端信息、exceptionCaught用于異常處理
客戶端代碼
客戶端啟動類
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil;import java.util.Scanner;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:06 * @description: 客戶端 * @Version: 1.0 */ public class NettyChartClient {private String ip;private int port;public NettyChartClient(String ip, int port) {this.ip = ip;this.port = port;}/*** 客戶端啟動*/public void run() {//創(chuàng)建工作線程池 CPU核數(shù)*2EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//得到PipelineChannelPipeline pipeline = socketChannel.pipeline();//加入相關(guān)handlerpipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", (ChannelHandler) new MyNettyClientHandler());}});System.out.println("---客戶端啟動了----");//連接服務(wù)端ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();Scanner can=new Scanner(System.in);while (can.hasNext()){String str=can.nextLine();channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(""+str, CharsetUtil.UTF_8));}//異步關(guān)閉通道channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}public static void main(String[] args) {NettyChartClient chartClient=new NettyChartClient("127.0.0.1",8888);chartClient.run();}}客戶端處理器
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 11:16 * @description: 客戶端處理器 * @Version: 1.0 */ public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println(""+msg);} }客戶端只需要獲取服務(wù)端傳來的消息,并且可以自己編寫消息發(fā)送即可。
運行測試
接下來,先將服務(wù)端啟動類啟動,然后再運行多個客戶端。查看控制臺,并發(fā)送消息,查看channel的生命周期
分別在各自的客戶端控制臺輸入信息,測試。基于Netty實現(xiàn)的群聊功能就這樣實現(xiàn)了
有興趣的老爺,可以關(guān)注我的公眾號【一起收破爛】,回復(fù)【006】獲取2021最新java面試資料以及簡歷模型120套哦~
總結(jié)
以上是生活随笔為你收集整理的基于Netty实现群聊功能的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jade选峰之后怎么去掉_教程丨用Jad
- 下一篇: 【修电脑】ctfmon.exe停止工作以