日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

四、Netty 实现心跳机制与断线重连

發(fā)布時(shí)間:2025/3/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 四、Netty 实现心跳机制与断线重连 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、概述

  • 何為心跳
    顧名思義, 所謂心跳, 即在 TCP 長連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對方自己還在線, 以確保 TCP 連接的有效性.

  • 為什么需要心跳
    因?yàn)榫W(wǎng)絡(luò)的不可靠性, 有可能在 TCP 保持長連接的過程中, 由于某些突發(fā)情況, 例如網(wǎng)線被拔出, 突然掉電等, 會造成服務(wù)器和客戶端的連接中斷. 在這些突發(fā)情況下, 如果恰好服務(wù)器和客戶端之間沒有交互的話, 那么它們是不能在短時(shí)間內(nèi)發(fā)現(xiàn)對方已經(jīng)掉線的. 為了解決這個(gè)問題, 我們就需要引入心跳機(jī)制. 心跳機(jī)制的工作原理是: 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會發(fā)送一個(gè)特殊的數(shù)據(jù)包給對方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性.

  • 如何實(shí)現(xiàn)心跳
    我們可以通過兩種方式實(shí)現(xiàn)心跳機(jī)制:

    • 使用 TCP 協(xié)議層面的 keepalive 機(jī)制.
    • 在應(yīng)用層上實(shí)現(xiàn)自定義的心跳機(jī)制.

    雖然在 TCP 協(xié)議層面上, 提供了keepalive 保活機(jī)制, 但是使用它有幾個(gè)缺點(diǎn):

    • 它不是 TCP 的標(biāo)準(zhǔn)協(xié)議, 并且是默認(rèn)關(guān)閉的.
    • TCP keepalive 機(jī)制依賴于操作系統(tǒng)的實(shí)現(xiàn), 默認(rèn)的 keepalive 心跳時(shí)間是 兩個(gè)小時(shí), 并且對 keepalive的修改需要系統(tǒng)調(diào)用(或者修改系統(tǒng)配置), 靈活性不夠.
    • TCP keepalive 與 TCP 協(xié)議綁定, 因此如果需要更換為 UDP 協(xié)議時(shí), keepalive 機(jī)制就失效了.

    雖然使用 TCP 層面的 keepalive 機(jī)制比自定義的應(yīng)用層心跳機(jī)制節(jié)省流量, 但是基于上面的幾點(diǎn)缺點(diǎn), 一般的實(shí)踐中, 人們大多數(shù)都是選擇在應(yīng)用層上實(shí)現(xiàn)自定義的心跳.既然如此, 那么我們就來大致看看在在 Netty 中是怎么實(shí)現(xiàn)心跳的吧. 在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 它可以對一個(gè)Channel的讀/寫設(shè)置定時(shí)器, 當(dāng)Channel 在一定事件間隔內(nèi)沒有數(shù)據(jù)交互時(shí)(即處于 idle 狀態(tài)), 就會觸發(fā)指定的事件.

    二、使用 Netty 實(shí)現(xiàn)心跳

    上面我們提到了, 在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 那么這個(gè) Handler 如何使用呢? 我們來看看它的構(gòu)造器:

    public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); }

    實(shí)例化一個(gè) IdleStateHandler 需要提供三個(gè)參數(shù):

    • readerIdleTimeSeconds, 讀超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時(shí), 會觸發(fā)一個(gè)READER_IDLE 的 IdleStateEvent 事件.

    • writerIdleTimeSeconds, 寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時(shí), 會觸發(fā)一個(gè)WRITER_IDLE 的 IdleStateEvent 事件.

    • allIdleTimeSeconds, 讀/寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有讀或?qū)懖僮鲿r(shí), 會觸發(fā)一個(gè) ALL_IDLE 的IdleStateEvent 事件.

    為了展示具體的 IdleStateHandler 實(shí)現(xiàn)的心跳機(jī)制, 下面我們來構(gòu)造一個(gè)具體的EchoServer 的例子, 這個(gè)例子的行為如下:
    這個(gè)例子中, 客戶端和服務(wù)器通過 TCP 長連接進(jìn)行通信.
    TCP 通信的報(bào)文格式是:

    客戶端每隔一個(gè)隨機(jī)的時(shí)間后, 向服務(wù)器發(fā)送消息, 服務(wù)器收到消息后, 立即將收到的消息原封不動地回復(fù)給客戶端.若客戶端在指定的時(shí)間間隔內(nèi)沒有讀/寫操作, 則客戶端會自動向服務(wù)器發(fā)送一個(gè) PING 心跳, 服務(wù)器收到 PING 心跳消息時(shí), 需要回復(fù)一個(gè) PONG 消息.

    三、上代碼

  • 通用部分
    根據(jù)上面定義的行為, 我們接下來實(shí)現(xiàn)心跳的通用部分 CustomHeartbeatHandler:
  • public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {public static final byte PING_MSG = 1;public static final byte PONG_MSG = 2;public static final byte CUSTOM_MSG = 3;protected String name;private int heartbeatCount = 0;public CustomHeartbeatHandler(String name) {this.name = name;}@Overrideprotected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {if (byteBuf.getByte(4) == PING_MSG) {sendPongMsg(context);} else if (byteBuf.getByte(4) == PONG_MSG){System.out.println(name + " get pong msg from " + context.channel().remoteAddress());} else {handleData(context, byteBuf);}}protected void sendPingMsg(ChannelHandlerContext context) {ByteBuf buf = context.alloc().buffer(5);buf.writeInt(5);buf.writeByte(PING_MSG);context.writeAndFlush(buf);heartbeatCount++;System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);}private void sendPongMsg(ChannelHandlerContext context) {ByteBuf buf = context.alloc().buffer(5);buf.writeInt(5);buf.writeByte(PONG_MSG);context.channel().writeAndFlush(buf);heartbeatCount++;System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);}protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// IdleStateHandler 所產(chǎn)生的 IdleStateEvent 的處理邏輯.if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;switch (e.state()) {case READER_IDLE:handleReaderIdle(ctx);break;case WRITER_IDLE:handleWriterIdle(ctx);break;case ALL_IDLE:handleAllIdle(ctx);break;default:break;}}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.err.println("---" + ctx.channel().remoteAddress() + " is active---");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");}protected void handleReaderIdle(ChannelHandlerContext ctx) {System.err.println("---READER_IDLE---");}protected void handleWriterIdle(ChannelHandlerContext ctx) {System.err.println("---WRITER_IDLE---");}protected void handleAllIdle(ChannelHandlerContext ctx) {System.err.println("---ALL_IDLE---");} }

    類 CustomHeartbeatHandler 負(fù)責(zé)心跳的發(fā)送和接收, 我們接下來詳細(xì)地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實(shí)現(xiàn)心跳的關(guān)鍵, 它會根據(jù)不同的 IO idle 類型來產(chǎn)生不同的 IdleStateEvent 事件, 而這個(gè)事件的捕獲, 其實(shí)就是在 userEventTriggered 方法中實(shí)現(xiàn)的.
    我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實(shí)現(xiàn):

    @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent e = (IdleStateEvent) evt;switch (e.state()) {case READER_IDLE:handleReaderIdle(ctx);break;case WRITER_IDLE:handleWriterIdle(ctx);break;case ALL_IDLE:handleAllIdle(ctx);break;default:break;}} }

    在 userEventTriggered 中, 根據(jù) IdleStateEvent 的 state() 的不同, 而進(jìn)行不同的處理. 例如如果是讀取數(shù)據(jù) idle, 則 e.state() == READER_IDLE, 因此就調(diào)用 handleReaderIdle 來處理它. CustomHeartbeatHandler 提供了三個(gè) idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個(gè)方法目前只有默認(rèn)的實(shí)現(xiàn), 它需要在子類中進(jìn)行重寫, 現(xiàn)在我們暫時(shí)略過它們, 在具體的客戶端和服務(wù)器的實(shí)現(xiàn)部分時(shí)再來看它們.

    知道了這一點(diǎn)后, 我們接下來看看數(shù)據(jù)處理部分:

    @Override protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {if (byteBuf.getByte(4) == PING_MSG) {sendPongMsg(context);} else if (byteBuf.getByte(4) == PONG_MSG){System.out.println(name + " get pong msg from " + context.channel().remoteAddress());} else {handleData(context, byteBuf);} }

    在 CustomHeartbeatHandler.channelRead0 中, 我們首先根據(jù)報(bào)文協(xié)議:

    來判斷當(dāng)前的報(bào)文類型, 如果是 PING_MSG 則表示是服務(wù)器收到客戶端的 PING 消息, 此時(shí)服務(wù)器需要回復(fù)一個(gè) PONG 消息, 其消息類型是 PONG_MSG.
    扔報(bào)文類型是 PONG_MSG, 則表示是客戶端收到服務(wù)器發(fā)送的 PONG 消息, 此時(shí)打印一個(gè) log 即可.

  • 客戶端部分
  • //客戶端初始化 public class Client {public static void main(String[] args) {NioEventLoopGroup workGroup = new NioEventLoopGroup(4);Random random = new Random(System.currentTimeMillis());try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline p = socketChannel.pipeline();p.addLast(new IdleStateHandler(0, 0, 5));p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));p.addLast(new ClientHandler());}});Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();for (int i = 0; i < 10; i++) {String content = "client msg " + i;ByteBuf buf = ch.alloc().buffer();buf.writeInt(5 + content.getBytes().length);buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);buf.writeBytes(content.getBytes());ch.writeAndFlush(buf);Thread.sleep(random.nextInt(20000));}} catch (Exception e) {throw new RuntimeException(e);} finally {workGroup.shutdownGracefully();}} }

    上面的代碼是 Netty 的客戶端端的初始化代碼, 使用過 Netty 的朋友對這個(gè)代碼應(yīng)該不會陌生. 別的部分我們就不再贅述, 我們來看看 ChannelInitializer.initChannel 部分即可:

    .handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline p = socketChannel.pipeline();p.addLast(new IdleStateHandler(0, 0, 5));p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));p.addLast(new ClientHandler());} });

    我們給 pipeline 添加了三個(gè) Handler, IdleStateHandler 這個(gè) handler 是心跳機(jī)制的核心, 我們?yōu)榭蛻舳硕嗽O(shè)置了讀寫 idle 超時(shí), 時(shí)間間隔是5s, 即如果客戶端在間隔 5s 后都沒有收到服務(wù)器的消息或向服務(wù)器發(fā)送消息, 則產(chǎn)生 ALL_IDLE 事件.接下來我們添加了LengthFieldBasedFrameDecoder, 它是負(fù)責(zé)解析我們的 TCP 報(bào)文, 因?yàn)楹捅疚牡哪康臒o關(guān), 因此這里不詳細(xì)展開.最后一個(gè) Handler 是 ClientHandler, 它繼承于 CustomHeartbeatHandler, 是我們處理業(yè)務(wù)邏輯部分.

    //客戶端 Handler public class ClientHandler extends CustomHeartbeatHandler {public ClientHandler() {super("client");}@Overrideprotected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {byte[] data = new byte[byteBuf.readableBytes() - 5];byteBuf.skipBytes(5);byteBuf.readBytes(data);String content = new String(data);System.out.println(name + " get content: " + content);}@Overrideprotected void handleAllIdle(ChannelHandlerContext ctx) {super.handleAllIdle(ctx);sendPingMsg(ctx);} }

    ClientHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個(gè)方法, 一個(gè)是 handleData, 在這里面實(shí)現(xiàn) 僅僅打印收到的消息.第二個(gè)重寫的方法是 handleAllIdle. 我們在前面提到, 客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 當(dāng)客戶端產(chǎn)生一個(gè) ALL_IDLE 事件后, 會導(dǎo)致父類的 CustomHeartbeatHandler.userEventTriggered 調(diào)用, 而 userEventTriggered 中會根據(jù) e.state() 來調(diào)用不同的方法, 因此最后調(diào)用的是 ClientHandler.handleAllIdle, 在這個(gè)方法中, 客戶端調(diào)用 sendPingMsg 向服務(wù)器發(fā)送一個(gè) PING 消息.

  • 服務(wù)器部分
  • //服務(wù)器初始化 public class Server {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workGroup = new NioEventLoopGroup(4);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline p = socketChannel.pipeline();p.addLast(new IdleStateHandler(10, 0, 0));p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));p.addLast(new ServerHandler());}});Channel ch = bootstrap.bind(12345).sync().channel();ch.closeFuture().sync();} catch (Exception e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}} }

    服務(wù)器的初始化部分也沒有什么好說的, 它也和客戶端的初始化一樣, 為 pipeline 添加了三個(gè) Handler.

    //服務(wù)器 Handler public class ServerHandler extends CustomHeartbeatHandler {public ServerHandler() {super("server");}@Overrideprotected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {byte[] data = new byte[buf.readableBytes() - 5];ByteBuf responseBuf = Unpooled.copiedBuffer(buf);buf.skipBytes(5);buf.readBytes(data);String content = new String(data);System.out.println(name + " get content: " + content);channelHandlerContext.write(responseBuf);}@Overrideprotected void handleReaderIdle(ChannelHandlerContext ctx) {super.handleReaderIdle(ctx);System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");ctx.close();} }

    ServerHandler 繼承于 CustomHeartbeatHandler,

    它重寫了兩個(gè)方法, 一個(gè)是 handleData, 在這里面實(shí)現(xiàn) EchoServer 的功能: 即收到客戶端的消息后, 立即原封不動地將消息回復(fù)給客戶端.第二個(gè)重寫的方法是 handleReaderIdle, 因?yàn)榉?wù)器僅僅對客戶端的讀 idle 感興趣, 因此只重新了這個(gè)方法. 若服務(wù)器在指定時(shí)間后沒有收到客戶端的消息, 則會觸發(fā) READER_IDLE 消息, 進(jìn)而會調(diào)用 handleReaderIdle 這個(gè)方法.我們在前面提到, 客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 并且服務(wù)器的 READER_IDLE 的超時(shí)時(shí)間是客戶端發(fā)送 PING 消息的間隔的兩倍, 因此當(dāng)服務(wù)器 READER_IDLE 觸發(fā)時(shí), 就可以確定是客戶端已經(jīng)掉線了, 因此服務(wù)器直接關(guān)閉客戶端連接即可.

    四、實(shí)現(xiàn)客戶端的斷線重連

    public class Client {private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);private Channel channel;private Bootstrap bootstrap;public static void main(String[] args) throws Exception {Client client = new Client();client.start();client.sendData();}public void sendData() throws Exception {Random random = new Random(System.currentTimeMillis());for (int i = 0; i < 10000; i++) {if (channel != null && channel.isActive()) {String content = "client msg " + i;ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);buf.writeInt(5 + content.getBytes().length);buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);buf.writeBytes(content.getBytes());channel.writeAndFlush(buf);}Thread.sleep(random.nextInt(20000));}}public void start() {try {bootstrap = new Bootstrap();bootstrap.group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline p = socketChannel.pipeline();p.addLast(new IdleStateHandler(0, 0, 5));p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));p.addLast(new ClientHandler(Client.this));}});doConnect();} catch (Exception e) {throw new RuntimeException(e);}}protected void doConnect() {if (channel != null && channel.isActive()) {return;}ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture futureListener) throws Exception {if (futureListener.isSuccess()) {channel = futureListener.channel();System.out.println("Connect to server successfully!");} else {System.out.println("Failed to connect to server, try connect after 10s");futureListener.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {doConnect();}}, 10, TimeUnit.SECONDS);}}});}}

    上面的代碼中, 我們抽象出 doConnect 方法, 它負(fù)責(zé)客戶端和服務(wù)器的 TCP 連接的建立, 并且當(dāng) TCP 連接失敗時(shí), doConnect 會 通過 “channel().eventLoop().schedule” 來延時(shí)10s 后嘗試重新連接.

    • 客戶端 Handler
    public class ClientHandler extends CustomHeartbeatHandler {private Client client;public ClientHandler(Client client) {super("client");this.client = client;}@Overrideprotected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {byte[] data = new byte[byteBuf.readableBytes() - 5];byteBuf.skipBytes(5);byteBuf.readBytes(data);String content = new String(data);System.out.println(name + " get content: " + content);}@Overrideprotected void handleAllIdle(ChannelHandlerContext ctx) {super.handleAllIdle(ctx);sendPingMsg(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);client.doConnect();} }

    斷線重連的關(guān)鍵一點(diǎn)是檢測連接是否已經(jīng)斷開. 因此我們改寫了 ClientHandler, 重寫了 channelInactive 方法. 當(dāng) TCP 連接斷開時(shí), 會回調(diào) channelInactive 方法, 因此我們在這個(gè)方法中調(diào)用client.doConnect() 來進(jìn)行重連.

    五、總結(jié)

    使用 Netty 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵就是利用 IdleStateHandler 來產(chǎn)生對應(yīng)的 idle 事件.一般是客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 因此客戶端注意關(guān)注 ALL_IDLE 事件, 在這個(gè)事件觸發(fā)后, 客戶端需要向服務(wù)器發(fā)送 PING 消息, 告訴服務(wù)器"我還存活著".服務(wù)器是接收客戶端的 PING 消息的, 因此服務(wù)器關(guān)注的是 READER_IDLE 事件, 并且服務(wù)器的 READER_IDLE 間隔需要比客戶端的 ALL_IDLE 事件間隔大(例如客戶端ALL_IDLE 是5s 沒有讀寫時(shí)觸發(fā), 因此服務(wù)器的 READER_IDLE 可以設(shè)置為10s)。當(dāng)服務(wù)器收到客戶端的 PING 消息時(shí), 會發(fā)送一個(gè) PONG 消息作為回復(fù). 一個(gè) PING-PONG 消息對就是一個(gè)心跳交互.

    文章轉(zhuǎn)自

    總結(jié)

    以上是生活随笔為你收集整理的四、Netty 实现心跳机制与断线重连的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。