Netty心跳机制-长连接
前文需求回顧
完成對紅酒窖的室內溫度采集及監控功能。由本地應用程序+溫度傳感器定時采集室內溫度上報至服務器,如果溫度 >20 °C 則由服務器下發重啟空調指令,如果本地應用長時間不上傳溫度給服務器,則給戶主手機發送一條預警短信。
Netty入門篇-從雙向通信開始「上文」
上篇算是完成簡單的雙向通信了,我們接著看看 “如果本地應用長時間不上傳溫度給服務器…”,很明顯客戶端有可能掛了嘛,所以怎么實現客戶端與服務端的長連接就是本文要實現的了。
什么是心跳機制
百度百科:心跳機制是定時發送一個自定義的結構體(心跳包),讓對方知道自己還活著,以確保連接的有效性的機制。
簡單說,這個心跳機制是由客戶端主動發起的消息,每隔一段時間就向服務端發送消息,告訴服務端自己還沒死,可不要給戶主發送預警短信啊。
如何實現心跳機制
1、客戶端代碼修改
我們需要改造一下上節中客戶端的代碼,首先是在責任鏈中增加一個心跳邏輯處理類HeartbeatHandler
public class NettyClient {private static String host = "127.0.0.1";public static void main(String[] args) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap// 1.指定線程模型.group(workerGroup)// 2.指定 IO 類型為 NIO.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)// 3.IO 處理邏輯.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)).addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new HeartbeatHandler()).addLast(new NettyClientHandler());}});// 4.建立連接bootstrap.connect(host, 8070).addListener(future -> {if (future.isSuccess()) {System.out.println("連接成功!");} else {System.err.println("連接失敗!");}});} }沒什么變化,主要是增加了HeartbeatHandler,我們來看看這個類:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.nio.charset.Charset; import java.time.LocalTime;public class HeartbeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.WRITER_IDLE) {System.out.println("10秒了,需要發送消息給服務端了" + LocalTime.now());//向服務端送心跳包ByteBuf buffer = getByteBuf(ctx);//發送心跳消息,并在發送失敗時關閉該連接ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("捕獲的異常:" + cause.getMessage());ctx.channel().close();}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 獲取二進制抽象 ByteBufByteBuf buffer = ctx.alloc().buffer();String time = "heartbeat:客戶端心跳數據:" + LocalTime.now();// 2. 準備數據,指定字符串的字符集為 utf-8byte[] bytes = time.getBytes(Charset.forName("utf-8"));// 3. 填充數據到 ByteBufbuffer.writeBytes(bytes);return buffer;}}還是繼承自ChannelInboundHandlerAdapter,不過這次重寫的是userEventTriggered()方法,這個方法在客戶端的所有ChannelHandler中,如果10s內沒有發生write事件時觸發,所以我們在該方法中給服務端發送心跳消息。
業務邏輯處理類NettyClientHandler沒有改動,代碼如下:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date; import java.util.Random;public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(new Date() + ": 客戶端寫出數據");// 1. 獲取數據ByteBuf buffer = getByteBuf(ctx);// 2. 寫數據ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 獲取二進制抽象 ByteBufByteBuf buffer = ctx.alloc().buffer();Random random = new Random();double value = random.nextDouble() * 14 + 8;String temp = "獲取室內溫度:" + value;// 2. 準備數據,指定字符串的字符集為 utf-8byte[] bytes = temp.getBytes(Charset.forName("utf-8"));// 3. 填充數據到 ByteBufbuffer.writeBytes(bytes);return buffer;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(new Date() + ": 客戶端讀到數據 -> " + msg.toString());}}對如上代碼不了解的可以回看上一節:Netty入門篇-從雙向通信開始
2、服務端代碼修改
服務端代碼主要是開啟TCP底層心跳機制支持,.childOption(ChannelOption.SO_KEEPALIVE, true) ,其他的代碼并沒有改動:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數.option(ChannelOption.SO_BACKLOG, 1024)//設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文.childOption(ChannelOption.SO_KEEPALIVE, true)//將小的數據包包裝成更大的幀進行傳送,提高網絡的負載.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler());}});serverBootstrap.bind(8070);}}我們再來看看服務端的業務處理類 NettyServerHandler
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date;public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = (ByteBuf) msg;String message = byteBuf.toString(Charset.forName("utf-8"));System.out.println(new Date() + ": 服務端讀到數據 -> " + message);/** 心跳數據是不發送數據 **/if(!message.contains("heartbeat")){ByteBuf out = getByteBuf(ctx);ctx.channel().writeAndFlush(out);}}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {byte[] bytes = "我是發送給客戶端的數據:請重啟冰箱!".getBytes(Charset.forName("utf-8"));ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(bytes);return buffer;}}對channelRead() 方法增加了一個 if 判斷,判斷如果包含heartbeat字符串就認為這是客戶端發過來的心跳,這種判斷是非常low的,因為到目前為止我們一直是用簡單字符串來傳遞數據的,上邊傳遞的數據就直接操作字符串;那么問題來了,如果我們想傳遞對象怎么搞呢?下節寫。我們先來看一下如上代碼客戶端與服務端運行截圖:
服務端
客戶端
至此,整個心跳機制就完成了,這樣每隔10秒客戶端就會給服務端發送一個心跳消息,下節我們通過了解通協議以完善心跳機制的代碼。
18年??飘厴I后,我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下:小偉后端筆記
總結
以上是生活随笔為你收集整理的Netty心跳机制-长连接的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OC中的点语法
- 下一篇: 电子政务网-网络架构