日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

bootstrap外不引用连接_网络编程Netty IoT百万长连接优化,万字长文精讲

發布時間:2025/3/11 编程问答 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 bootstrap外不引用连接_网络编程Netty IoT百万长连接优化,万字长文精讲 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

IoT是什么

The Internet of things的簡稱IoT,即是物聯網的意思

IoT推送系統的設計

比如說,像一些智能設備,需要通過APP或者微信中的小程序等,給設備發送一條指令,讓這個設備下載或者播放音樂,那么需要做什么才可以完成上面的任務呢?

首先需要推送服務器,這個服務器主要負責消息的分發,不處理業務消息;設備會連接到推送服務器,APP通過把指令發送到推送服務器,然后推送服務器再把指令分發給相應的設備。

可是,當買設備的人越來越多,推送服務器所能承受的壓力就越大,這個時候就需要對推送服務器做集群,一臺不行,就搞十臺,那么還有一個問題,就是推送服務器增加了,設備如何找到相應的服務器,然后和服務器建立連接呢,注冊中心可以解決這個問題,每一臺服務器都注冊到注冊中心上,設備會請求注冊中心,得到推送服務器的地址,然后再和服務器建立連接。

而且還會有相應的redis集群,用來記錄設備訂閱的主題以及設備的信息;APP發送指令到設備,其實就是發送了一串數據,相應的會提供推送API,提供一些接口,通過接口把數據發送過去;而推送API不是直接去連接推送服務器的,中間還會有MQ集群,主要用來消息的存儲,推送API推送消息到MQ,推送服務器從MQ中訂閱消息,以上就是簡單的IoT推送系統的設計。

下面看下結構圖:

注意:設備連接到注冊中心的是短連接,設備和推送服務器建立的連接是長連接

心跳檢測機制

簡述心跳檢測

心跳檢測,就是判斷對方是否還存活,一般采用定時的發送一些簡單的包,如果在指定的時間段內沒有收到對方的回應,則判斷對方已經掛掉

Netty提供了IdleStateHandler類來實現心跳,簡單的使用如下:

pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));

下面是IdleStateHandler的構造函數:

public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}

四個參數說明:

  • readerIdleTime,讀超時時間
  • writerIdleTime,寫超時時間
  • allIdleTime,所有事件超時時間
  • TimeUnit unit,超時時間單位
  • 心跳檢測機制代碼示例

    簡單示例: 服務端:

    static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PingServer().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS)); pipeline.addLast(new PingHandler()); //每個連接都有個ConnectionCountHandler對連接記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> { System.out.println("端口綁定成功: " + beginPort); }); System.out.println("服務已啟動!");}public class PingHandler extends SimpleUserEventChannelHandler { private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes())); private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("pong".equals(str)) { System.out.println(ctx + " ---- " + str); count--; } ctx.fireChannelRead(msg); } @Override protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt.state() == ALL_IDLE) { if (count >= 3) { System.out.println("檢測到客戶端連接無響應,斷開連接:" + ctx.channel()); ctx.close(); return; } count++; System.out.println(ctx.channel() + " ---- ping"); ctx.writeAndFlush(PING_BUF.duplicate()); } ctx.fireUserEventTriggered(evt); }}

    客戶端:

    //服務端的IP private static final String SERVER_HOST = "localhost"; static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PoneClient().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new PongHandler()); } }); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort); channelFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { System.out.println("連接失敗,退出!"); System.exit(0); } }); try { channelFuture.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }public class PongHandler extends SimpleChannelInboundHandler { private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes())); @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("ping".equals(str)) { ctx.writeAndFlush(PONG_BUF.duplicate()); } }}

    服務端輸出結果:

    百萬長連接優化

    連接優化代碼示例

    服務端:

    static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Server().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //每個連接都有個ConnectionCountHandler對連接記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); //這里開啟 10000到100099這100個端口 for (int i = 0; i < nPort; i++) { int port = beginPort + i; bootstrap.bind(port).addListener((ChannelFutureListener) future -> { System.out.println("端口綁定成功: " + port); }); } System.out.println("服務已啟動!"); }

    客戶端:

    //服務端的IP private static final String SERVER_HOST = "192.168.231.129"; static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Client().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); //從10000的端口開始,按端口遞增的方式進行連接 while (!Thread.interrupted()) { port = beginPort + index; try { ChannelFuture channelFuture = bootstrap.connect(serverHost, port); channelFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { System.out.println("連接失敗,退出!"); System.exit(0); } }); channelFuture.get(); } catch (Exception e) { } if (++index == nPort) { index = 0; } } }

    ConnectionCountHandler類:

    public class ConnectionCountHandler extends ChannelInboundHandlerAdapter { //這里用來對連接數進行記數,每兩秒輸出到控制臺 private static final AtomicInteger nConnection = new AtomicInteger(); static { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { System.out.println("連接數: " + nConnection.get()); }, 0, 2, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) { nConnection.incrementAndGet(); } @Override public void channelInactive(ChannelHandlerContext ctx) { nConnection.decrementAndGet(); }}

    上述的代碼會打包成jar放到linux上運行,對于上述的優化來說,程序方面的就暫時不做,下面會從操作系統層面進行優化,讓其支撐起百萬連接。

    TCP連接四元組

    在優化之前先來看下網絡里的一個小知識,TCP連接四元組: 服務器的IP+服務器的POST+客戶端的IP+客戶端的POST

    端口的范圍一般是1到65535:

    配置優化

    現在在虛擬機上安裝兩個linux系統,配置分別是:

    地址CPU內存JDK作用192.168.15.130VM-4核8G1.8客戶端192.168.15.128VM-4核8G1.8服務端

    啟動服務端: java -Xmx4g -Xms4g -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Server > out.log 2>&1 & 啟動客戶端: java -Xmx4g -Xms4g -Dserver.host=192.168.15.128 -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Client

    啟動服務端后可以使用tail -f命令查看out.log中的日志:

    客戶端啟動后,如果報了以下錯誤,需要修改系統的文件最大句柄和進程的文件最大句柄:

    Caused by: java.io.IOException: Too many open files at sun.nio.ch.FileDispatcherImpl.init(Native Method) at sun.nio.ch.FileDispatcherImpl.(FileDispatcherImpl.java:35) ... 8 more

    優化系統最大句柄: 查看操作系統最大文件句柄數,執行命令cat /proc/sys/fs/file-max,查看最大句柄數是否滿足需要,如果不滿足,通過vim /etc/sysctl.conf命令插入如下配置:

    fs.file-max = 1000000
  • 設置單進程打開的文件最大句柄數,執行命令ulimit -a查看當前設置是否滿足要求:
  • [root@test-server2 download]# ulimit -a | grep "open files"open files (-n) 1024

    當并發接入的Tcp連接數超過上限時,就會提示“Too many open files”,所有的新客戶端接入將會失敗。通過vim /etc/security/limits.conf 修改配置參數:

    * soft nofile 1000000* hard nofile 1000000

    修改配置參數后注銷生效。

    • 如果程序被中斷,或報了異常
    java.io.IOException: 設備上沒有空間 at sun.nio.ch.EPollArrayWrapper.epollCtl(Native Method) at sun.nio.ch.EPollArrayWrapper.updateRegistrations(EPollArrayWrapper.java:299) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:268) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) at sun.nio.ch.SelectorImpl.selectNow(SelectorImpl.java:105) at io.netty.channel.nio.SelectedSelectionKeySetSelector.selectNow(SelectedSelectionKeySetSelector.java:56) at io.netty.channel.nio.NioEventLoop.selectNow(NioEventLoop.java:750) at io.netty.channel.nio.NioEventLoop$1.get(NioEventLoop.java:71) at io.netty.channel.DefaultSelectStrategy.calculateStrategy(DefaultSelectStrategy.java:30) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:426) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
    • 此時可以查看操作系統的日志more /var/log/messages,或在程序啟動時執行tail -f /var/log/messages 監控日志。如果日志中出現以下內容,說明需要優化TCP/IP參數
    Jun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned socketsJun 4 16:55:01 localserver kernel: TCP: too many orphaned sockets

    ==優化TCP/IP相關參數:==

    • 查看客戶端端口范圍限制
    cat /proc/sys/net/ipv4/ip_local_port_range
    • 通過vim /etc/sysctl.conf 修改網絡參數
    • 客戶端修改端口范圍的限制
    net.ipv4.ip_local_port_range = 1024 65535
    • 優化TCP參數
    net.ipv4.tcp_mem = 786432 2097152 3145728net.ipv4.tcp_wmem = 4096 4096 16777216net.ipv4.tcp_rmem = 4096 4096 16777216net.ipv4.tcp_keepalive_time = 1800net.ipv4.tcp_keepalive_intvl = 20net.ipv4.tcp_keepalive_probes = 5net.ipv4.tcp_tw_reuse = 1net.ipv4.tcp_tw_recycle = 1net.ipv4.tcp_fin_timeout = 30

    ==參數說明:==

    net.ipv4.tcp_mem: 分配給tcp連接的內存,單位是page(1個Page通常是4KB,可以通過getconf PAGESIZE命令查看),三個值分別是最小、默認、和最大。比如以上配置中的最大是3145728,那分配給tcp的最大內存=31457284 / 1024 / 1024 = 12GB。一個TCP連接大約占7.5KB,粗略可以算出百萬連接≈7.51000000/4=1875000 3145728足以滿足測試所需。

    net.ipv4.tcp_wmem: 為每個TCP連接分配的寫緩沖區內存大小,單位是字節。三個值分別是最小、默認、和最大。

    net.ipv4.tcp_rmem: 為每個TCP連接分配的讀緩沖區內存大小,單位是字節。三個值分別是最小、默認、和最大。

    net.ipv4.tcp_keepalive_time: 最近一次數據包發送與第一次keep alive探測消息發送的事件間隔,用于確認TCP連接是否有效。

    net.ipv4.tcp_keepalive_intvl: 在未獲得探測消息響應時,發送探測消息的時間間隔。

    net.ipv4.tcp_keepalive_probes: 判斷TCP連接失效連續發送的探測消息個數,達到之后判定連接失效。

    net.ipv4.tcp_tw_reuse: 是否允許將TIME_WAIT Socket 重新用于新的TCP連接,默認為0,表示關閉。

    net.ipv4.tcp_tw_recycle: 是否開啟TIME_WAIT Socket 的快速回收功能,默認為0,表示關閉。

    net.ipv4.tcp_fin_timeout: 套接字自身關閉時保持在FIN_WAIT_2 狀態的時間。默認為60。

    轉載于:https://juejin.im/post/6861560765200105486

    作者:狐言不胡言

    總結

    以上是生活随笔為你收集整理的bootstrap外不引用连接_网络编程Netty IoT百万长连接优化,万字长文精讲的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。