使用netty实现一个类似于微信的聊天功能
生活随笔
收集整理的這篇文章主要介紹了
使用netty实现一个类似于微信的聊天功能
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1.maven依賴
? ? ? ?<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.25.Final</version></dependency>2.netty代碼
部分業(yè)務(wù)邏輯代碼已省略。
后端框架為SpringBoot+MyBatis+Spring MVC
ChatHandler.java
/*** 處理消息的handler* TextWebSocketFrame: 在netty中,是用于為websocket專門處理文本的對(duì)象,frame是消息的載體*/ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { ?/*** 用來(lái)保存所有的客戶端連接*/private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); ?/*** 當(dāng)Channel中有新的事件消息會(huì)自動(dòng)調(diào)用*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {// 當(dāng)接收到數(shù)據(jù)后會(huì)自動(dòng)調(diào)用 ?// 獲取客戶端發(fā)送過(guò)來(lái)的文本消息String text = msg.text();System.out.println("接收到消息數(shù)據(jù)為:" + text); ?Message message = JSON.parseObject(text, Message.class); ?// 通過(guò)SpringUtil工具類獲取Spring上下文容器ChatRecordService chatRecordService = SpringUtil.getBean(ChatRecordService.class); ?switch (message.getType()) {// 處理客戶端連接的消息case 0:// 建立用戶與通道的關(guān)聯(lián)String userid = message.getChatRecord().getUserid();UserChannelMap.put(userid, ctx.channel());System.out.println("建立用戶:" + userid + "與通道" + ctx.channel().id() + "的關(guān)聯(lián)");UserChannelMap.print();break;// 處理客戶端發(fā)送好友消息case 1:System.out.println("接收到用戶消息");// 將聊天消息保存到數(shù)據(jù)庫(kù)TbChatRecord chatRecord = message.getChatRecord();chatRecordService.insert(chatRecord); ?// 如果發(fā)送消息好友在線,可以直接將消息發(fā)送給好友Channel channel = UserChannelMap.get(chatRecord.getFriendid());if (channel != null) {channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));} else {// 如果不在線,暫時(shí)不發(fā)送System.out.println("用戶" + chatRecord.getFriendid() + "不在線");}break;// 處理客戶端的簽收消息case 2:// 將消息記錄設(shè)置為已讀chatRecordService.updateStatusHasRead(message.getChatRecord().getId());break;case 3:// 接收心跳消息System.out.println("接收到心跳消息:" + JSON.toJSONString(message));break;default:} ?} ?/*** 當(dāng)有新的客戶端連接服務(wù)器之后,會(huì)自動(dòng)調(diào)用這個(gè)方法*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {// 將新的通道加入到clientsclients.add(ctx.channel());} ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());ctx.channel().close();} ?@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("關(guān)閉通道");UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());UserChannelMap.print();} }HearBeatHandler.java
/** * 有時(shí)候Netty并不能在到客戶端關(guān)閉時(shí),自動(dòng)關(guān)閉對(duì)應(yīng)的通道資源。所以需要一個(gè)心跳機(jī)制,去檢測(cè)每個(gè)通道是否空閑。 * ? 如果空閑超過(guò)一定時(shí)間,就需要將對(duì)應(yīng)客戶端的通道資源關(guān)閉。客戶端需要每隔一段時(shí)間發(fā)送一條消息,用來(lái)保持心跳。 * ? 該代碼中約定message.getType==3為心跳消息,不需要處理 */ public class HearBeatHandler extends ChannelInboundHandlerAdapter { ?@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt; ?if (idleStateEvent.state() == IdleState.READER_IDLE) {System.out.println("讀空閑事件觸發(fā)...");} else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {System.out.println("寫空閑事件觸發(fā)...");} else if (idleStateEvent.state() == IdleState.ALL_IDLE) {System.out.println("---------------");System.out.println("讀寫空閑事件觸發(fā)");System.out.println("關(guān)閉通道資源");ctx.channel().close();}}} }Message.java
該實(shí)體為和前端約定好的格式,通過(guò)不同的消息類型,達(dá)到不同的功能。
public class Message {private Integer type; // 消息類型private TbChatRecord chatRecord; ? ?// 聊天消息private Object ext; ?// 擴(kuò)展消息字段 ?public Integer getType() {return type;} ?public void setType(Integer type) {this.type = type;} ?public TbChatRecord getChatRecord() {return chatRecord;} ?public void setChatRecord(TbChatRecord chatRecord) {this.chatRecord = chatRecord;} ?public Object getExt() {return ext;} ?public void setExt(Object ext) {this.ext = ext;} } ?TbChatRecord.java
public class TbChatRecord {private String id; ?private String userid; ?private String friendid; ?private Integer hasRead; ?private Date createtime; ?private Integer hasDelete; ?private String message; }NettyListener.java
/*** 服務(wù)啟動(dòng)監(jiān)聽器*/ @Component public class NettyListener implements ApplicationListener<ContextRefreshedEvent> { ?@Autowiredprivate WebSocketServer websocketServer; ?@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext().getParent() == null) {try {websocketServer.start();} catch (Exception e) {e.printStackTrace();}}} }UserChannelMap.java
/*** 建立用戶ID與通道的關(guān)聯(lián)*/ public class UserChannelMap {// 用戶保存用戶id與通道的Map對(duì)象private static Map<String, Channel> userChannelMap; ?static {userChannelMap = new HashMap<String, Channel>();} ?/*** 添加用戶id與channel的關(guān)聯(lián)* @param userid* @param channel*/public static void put(String userid, Channel channel) {userChannelMap.put(userid, channel);} ?/*** 根據(jù)用戶id移除用戶id與channel的關(guān)聯(lián)* @param userid*/public static void remove(String userid) {userChannelMap.remove(userid);} ?/*** 根據(jù)通道id移除用戶與channel的關(guān)聯(lián)* @param channelId 通道的id*/public static void removeByChannelId(String channelId) {if(!StringUtils.isNotBlank(channelId)) {return;} ?for (String s : userChannelMap.keySet()) {Channel channel = userChannelMap.get(s);if(channelId.equals(channel.id().asLongText())) {System.out.println("客戶端連接斷開,取消用戶" + s + "與通道" + channelId + "的關(guān)聯(lián)");userChannelMap.remove(s);break;}}} ? ?// 打印所有的用戶與通道的關(guān)聯(lián)數(shù)據(jù)public static void print() {for (String s : userChannelMap.keySet()) {System.out.println("用戶id:" + s + " 通道:" + userChannelMap.get(s).id());}} ?/*** 根據(jù)好友id獲取對(duì)應(yīng)的通道* @param friendid 好友id* @return Netty通道*/public static Channel get(String friendid) {return userChannelMap.get(friendid);} }WebsocketInitializer.java
/*** 用于在某個(gè)Channel注冊(cè)到EventLoop后,對(duì)這個(gè)Channel執(zhí)行一些初始化操作* ChannelInitializer雖然會(huì)在一開始會(huì)被注冊(cè)到Channel相關(guān)的pipeline里,* 但是在初始化完成之后,ChannelInitializer會(huì)將自己從pipeline中移除,不會(huì)影響后續(xù)的操作*/ public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); ?// ------------------// 用于支持Http協(xié)議// ------------------ ?// websocket基于http協(xié)議,需要有http的編解碼器pipeline.addLast(new HttpServerCodec());// 對(duì)寫大數(shù)據(jù)流的支持pipeline.addLast(new ChunkedWriteHandler());// 添加對(duì)HTTP請(qǐng)求和響應(yīng)的聚合器:只要使用Netty進(jìn)行Http編程都需要使用// 對(duì)HttpMessage進(jìn)行聚合,聚合成FullHttpRequest或者FullHttpResponse// 在netty編程中都會(huì)使用到Handlerpipeline.addLast(new HttpObjectAggregator(1024 * 64)); ?pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); ?// 添加Netty空閑超時(shí)檢查的支持// 1. 讀空閑超時(shí)(超過(guò)一定的時(shí)間會(huì)發(fā)送對(duì)應(yīng)的事件消息)// 2. 寫空閑超時(shí)// 3. 讀寫空閑超時(shí)pipeline.addLast(new IdleStateHandler(4, 8, 12)); ?pipeline.addLast(new HearBeatHandler());// 添加自定義的handlerpipeline.addLast(new ChatHandler()); ?} }WebSocketServer.java
/*** netty的服務(wù)器*/ @Component public class WebSocketServer { ?private EventLoopGroup bossGroup; ? ? ? // 主線程池private EventLoopGroup workerGroup; ? ? // 工作線程池private ServerBootstrap server; ? ? ? ? // 服務(wù)器private ChannelFuture future; ? ? ? ? ? // 回調(diào) ?public void start() {future = server.bind(9001);System.out.println("netty server - 啟動(dòng)成功");} ?public WebSocketServer() {bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup(); ?server = new ServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new WebsocketInitializer());} }SpringUtil.java
/*** @Description: 提供手動(dòng)獲取被spring管理的bean對(duì)象*/ @Component public class SpringUtil implements ApplicationContextAware {private static ApplicationContext applicationContext; ?@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {if (SpringUtil.applicationContext == null) {SpringUtil.applicationContext = applicationContext;}} ?// 獲取applicationContextpublic static ApplicationContext getApplicationContext() {return applicationContext;} ?// 通過(guò)name獲取 Bean.public static Object getBean(String name) {return getApplicationContext().getBean(name);} ?// 通過(guò)class獲取Bean.public static <T> T getBean(Class<T> clazz) {return getApplicationContext().getBean(clazz);} ?// 通過(guò)name,以及Clazz返回指定的Beanpublic static <T> T getBean(String name, Class<T> clazz) {return getApplicationContext().getBean(name, clazz);} ? }?
總結(jié)
以上是生活随笔為你收集整理的使用netty实现一个类似于微信的聊天功能的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 使用netty搭建一个简单的聊天室
- 下一篇: Activiti工作流入门