日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

三、Netty的粘包半包问题解决

發(fā)布時間:2025/3/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 三、Netty的粘包半包问题解决 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、定義

TCP 傳輸中,客戶端發(fā)送數(shù)據(jù),實際是把數(shù)據(jù)寫入到了 TCP 的緩存中,粘包和半包也就會在此時產(chǎn)生。客戶端給服務(wù)端發(fā)送了兩條消息ABC和DEF,服務(wù)端這邊的接收會有多少種情況呢?有可能是一次性收到了所有的消息ABCDEF,有可能是收到了三條消息AB、CD、EF。

  • 粘包
  • 上面所說的一次性收到了所有的消息ABCDEF,類似于粘包。如果客戶端發(fā)送的包的大小比 TCP 的緩存容量小,并且 TCP 緩存可以存放多個包,那么客戶端和服務(wù)端的一次通信就可能傳遞了多個包,這時候服務(wù)端從 TCP 緩存就可能一下讀取了多個包,這種現(xiàn)象就叫粘包。

  • 半包
  • 上面說的后面那種收到了三條消息AB、CD、EF,類似于半包。如果客戶端發(fā)送的包的大小比 TCP 的緩存容量大,那么這個數(shù)據(jù)包就會被分成多個包,通過 Socket 多次發(fā)送到服務(wù)端,服務(wù)端第一次從接受緩存里面獲取的數(shù)據(jù),實際是整個包的一部分,這時候就產(chǎn)生了半包(半包不是說只收到了全包的一半,是說收到了全包的一部分)。

    二、產(chǎn)生原因

    其實從上面的定義,我們就可以大概知道產(chǎn)生的原因了。

  • 粘包的主要原因:
    • 發(fā)送方每次寫入數(shù)據(jù) < 套接字(Socket)緩沖區(qū)大小
    • 接收方讀取套接字(Socket)緩沖區(qū)數(shù)據(jù)不夠及時
  • 半包的主要原因:
    • 發(fā)送方每次寫入數(shù)據(jù) > 套接字(Socket)緩沖區(qū)大小
    • 發(fā)送的數(shù)據(jù)大于協(xié)議的 MTU (Maximum Transmission Unit,最大傳輸單元),因此必須拆包

    其實我們可以換個角度看待問題:

    • 從收發(fā)的角度看,便是一個發(fā)送可能被多次接收,多個發(fā)送可能被一次接收。
    • 從傳輸?shù)慕嵌瓤?#xff0c;便是一個發(fā)送可能占用多個傳輸包,多個發(fā)送可能共用一個傳輸包。

    根本原因,其實是

    • TCP 是流式協(xié)議,消息無邊界。
    • (PS : UDP雖然也可以一次傳輸多個包或者多次傳輸一個包,但每個消息都是有邊界的,因此不會有粘包和半包問題。)

    三、現(xiàn)象分析

    • 滑動窗口

    TCP 以一個段(segment)為單位,每發(fā)送一個段就需要進(jìn)行一次確認(rèn)應(yīng)答(ack)處理,但如果這么做,缺點是包的往返時間越長性能就越差

    為了解決此問題,引入了窗口概念,窗口大小即決定了無需等待應(yīng)答而可以繼續(xù)發(fā)送的數(shù)據(jù)最大值

    • 窗口實際就起到一個緩沖區(qū)的作用,同時也能起到流量控制的作用
    • 圖中深色的部分即要發(fā)送的數(shù)據(jù),高亮的部分即窗口
    • 窗口內(nèi)的數(shù)據(jù)才允許被發(fā)送,當(dāng)應(yīng)答未到達(dá)前,窗口必須停止滑動
    • 如果 1001~2000 這個段的數(shù)據(jù) ack 回來了,窗口就可以向前滑動
    • 接收方也會維護一個窗口,只有落在窗口內(nèi)的數(shù)據(jù)才能允許接收
    • MSS 限制
    • 鏈路層對一次能夠發(fā)送的最大數(shù)據(jù)有限制,這個限制稱之為 MTU(maximum transmission unit),不同的鏈路設(shè)備的 MTU 值也有所不同,例如
    • 以太網(wǎng)的 MTU 是 1500
    • FDDI(光纖分布式數(shù)據(jù)接口)的 MTU 是 4352
    • 本地回環(huán)地址的 MTU 是 65535 - 本地測試不走網(wǎng)卡
    • MSS 是最大段長度(maximum segment size),它是 MTU 刨去 tcp 頭和 ip 頭后剩余能夠作為數(shù)據(jù)傳輸?shù)淖止?jié)數(shù)
    • ipv4 tcp 頭占用 20 bytes,ip 頭占用 20 bytes,因此以太網(wǎng) MSS 的值為 1500 - 40 = 1460
    • TCP 在傳遞大量數(shù)據(jù)時,會按照 MSS 大小將數(shù)據(jù)進(jìn)行分割發(fā)送
    • MSS 的值在三次握手時通知對方自己 MSS 的值,然后在兩者之間選擇一個小值作為 MSS

    • Nagle 算法
    • 即使發(fā)送一個字節(jié),也需要加入 tcp 頭和 ip 頭,也就是總字節(jié)數(shù)會使用 41 bytes,非常不經(jīng)濟。因此為了提高網(wǎng)絡(luò)利用率,tcp 希望盡可能發(fā)送足夠大的數(shù)據(jù),這就是 Nagle 算法產(chǎn)生的緣由
    • 該算法是指發(fā)送端即使還有應(yīng)該發(fā)送的數(shù)據(jù),但如果這部分?jǐn)?shù)據(jù)很少的話,則進(jìn)行延遲發(fā)送
      • 如果 SO_SNDBUF 的數(shù)據(jù)達(dá)到 MSS,則需要發(fā)送
      • 如果 SO_SNDBUF 中含有 FIN(表示需要連接關(guān)閉)這時將剩余數(shù)據(jù)發(fā)送,再關(guān)閉
      • 如果 TCP_NODELAY = true,則需要發(fā)送
      • 已發(fā)送的數(shù)據(jù)都收到 ack 時,則需要發(fā)送
      • 上述條件不滿足,但發(fā)生超時(一般為 200ms)則需要發(fā)送
      • 除上述情況,延遲發(fā)送

    四、粘包半包代碼

  • 粘包
    • server端代碼
    public class HelloWorldServer {static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("connected {}", ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug("disconnect {}", ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);log.debug("{} binding...", channelFuture.channel());channelFuture.sync();log.debug("{} bound...", channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug("stoped");}}public static void main(String[] args) {new HelloWorldServer().start();} }
    • client端:希望發(fā)送 10 個消息,每個消息是 16 字節(jié)
    public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");//客戶端代碼希望發(fā)送 10 個消息,每個消息是 16 字節(jié)for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}} }


    可以看到server一次就接收了 160 個字節(jié),而非分 10 次接收。

  • 半包
    • client :客戶端代碼發(fā)送 1 個消息,這個消息是 160 字節(jié)
    public class HelloWorldClient {static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("connected...");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");//客戶端代碼發(fā)送 1 個消息,這個消息是 160 字節(jié)ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 10; i++) {buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}} }

    為現(xiàn)象明顯,服務(wù)端修改一下接收緩沖區(qū),其它代碼不變

    //注意:這行影響的底層接收緩沖區(qū)(即滑動窗口)大小,僅決定了 netty 讀取的最小單位,netty 實際每次讀取的一般是它的整數(shù)倍 serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);


    可以看到接收的消息被分為兩節(jié),第一次 20 字節(jié),第二次 140 字節(jié)(半包)

    五、解決方案

    • 改成短連接 (參考UDP)

    將 TCP 連接改成短連接,一個請求一個短連接。這樣的話,建立連接到釋放連接之間的消息即為傳輸?shù)男畔?#xff0c;消息也就產(chǎn)生了邊界。這樣的方法就是十分簡單,不需要在我們的應(yīng)用中做過多修改。但缺點也就很明顯了,效率低下,TCP連接和斷開都會涉及三次握手以及四次握手,每個消息都會涉及這些過程,十分浪費性能。 因此,并不推介這種方式。

    • 封裝成幀
      封裝成幀(Framing),也就是原本發(fā)送消息的單位是緩沖大小,現(xiàn)在換成了幀,這樣我們就可以自定義邊界了。一般有4種方式:
  • 固定長度 這種方式下,消息邊界也就是固定長度即可。 優(yōu)點就是實現(xiàn)很簡單,缺點就是空間有極大的浪費,如果傳遞的消息中大部分都比較短,這樣就會有很多空間是浪費的。 因此,這種方式一般也是不推介的。
  • 分隔符 這種方式下,消息邊界也就是分隔符本身。優(yōu)點是空間不再浪費,實現(xiàn)也比較簡單。缺點是當(dāng)內(nèi)容本身出現(xiàn)分割符時需要轉(zhuǎn)義,所以無論是發(fā)送還是接受,都需要進(jìn)行整個內(nèi)容的掃描。 因此,這種方式效率也不是很高,但可以嘗試使用。
  • 專門的 length 字段 這種方式,就有點類似 Http 請求中的 Content-Length,有一個專門的字段存儲消息的長度。作為服務(wù)端,接受消息時,先解析固定長度的字段(length字段)獲取消息總長度,然后讀取后續(xù)內(nèi)容。優(yōu)點是精確定位用戶數(shù)據(jù),內(nèi)容也不用轉(zhuǎn)義。缺點是長度理論上有限制,需要提前限制可能的最大長度從而定義長度占用字節(jié)數(shù)。 因此,十分推介用這種方式。
  • 六、Netty 中的實現(xiàn)

    Netty 支持上文所講的封裝成幀(Framing)中的前三種方式,簡單介紹下:

    方式解碼編碼
    固定長度FixedLengthFrameDecoder簡單(不內(nèi)置)
    分割符DelimiterBasedFrameDecoder (需指定最大長度和自定義Bytebuf類型的分隔符)簡單(不內(nèi)置)
    專門的 length 字段LengthFieldBasedFrameDecoderLengthFieldPrepender
    • FixedLengthFrameDecoder
    /*** @author cristianoxiaoming@gmail.com* @version 1.0* @ClassName MyFixedLengthFrameClient* @Package MyNetty.Netty.StickyUnpackingPacket* @ProjectName javaStudy* @description TODO* @date 2021-09-24 11:04* @ProductName IntelliJ IDEA*/ @Slf4j public class MyFixedLengthFrameClient {public static void main(String[] args) {send();System.out.println("finish");}public static byte[] fill10Bytes(char c, int len) {byte[] bytes = new byte[10];Arrays.fill(bytes, (byte) '_');for (int i = 0; i < len; i++) {bytes[i] = (byte) c;}System.out.println(new String(bytes));return bytes;}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 會在連接 channel 建立成功后,會觸發(fā) active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();char c = '0';Random r = new Random();for (int i = 0; i < 10; i++) {//發(fā)送10次信息,每次長度由1到10個字節(jié)隨機byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1);c++;buf.writeBytes(bytes);}ctx.writeAndFlush(buf);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}} } /*** @author cristianoxiaoming@gmail.com* @version 1.0* @ClassName MyFixedLengthFrameServer* @Package MyNetty.Netty.StickyUnpackingPacket* @ProjectName javaStudy* @description FixedLengthFrameDecoder測試* @date 2021-09-24 11:03* @ProductName IntelliJ IDEA*/ @Slf4j public class MyFixedLengthFrameServer {void start() {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);// 調(diào)整系統(tǒng)的接收緩沖區(qū)(滑動窗口) // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);// 調(diào)整 netty 的接收緩沖區(qū)(byteBuf)serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//一定要將FixedLengthFrameDecoder剛在第一個處理(不然拿著沒解碼的楨進(jìn)行處理也沒用),在執(zhí)行其他handlerch.pipeline().addLast(new FixedLengthFrameDecoder(10));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}public static void main(String[] args) {new MyFixedLengthFrameServer().start();} }

    可以看到,客戶端一次性將10條信息100字節(jié)一次性發(fā)出去,這里已經(jīng)發(fā)生了粘包。

    但是可以看到,server端已經(jīng)根據(jù)設(shè)置的framLength=10進(jìn)行分包了,接收的時候是正常的。

    • DelimiterBasedFrameDecoder
    /*** @author cristianoxiaoming@gmail.com* @version 1.0* @ClassName MyDelimiterBasedFrameDecoderServer* @Package MyNetty.Netty.StickyUnpackingPacket* @ProjectName javaStudy* @description TODO* @date 2021-09-24 11:25* @ProductName IntelliJ IDEA*/ @Slf4j public class MyDelimiterBasedFrameDecoderServer {void start() {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);// 調(diào)整系統(tǒng)的接收緩沖區(qū)(滑動窗口) // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);// 調(diào)整 netty 的接收緩沖區(qū)(byteBuf)serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//超過1024個字節(jié),還沒有遇到換行符就會失敗ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}public static void main(String[] args) {new MyDelimiterBasedFrameDecoderServer().start();} } /*** @author cristianoxiaoming@gmail.com* @version 1.0* @ClassName MyDelimiterBasedFrameDecoderClient* @Package MyNetty.Netty.StickyUnpackingPacket* @ProjectName javaStudy* @description TODO* @date 2021-09-24 12:52* @ProductName IntelliJ IDEA*/ @Slf4j public class MyDelimiterBasedFrameDecoderClient {public static void main(String[] args) {send();System.out.println("finish");}public static StringBuilder makeString(char c, int len) {StringBuilder sb = new StringBuilder(len + 2);for (int i = 0; i < len; i++) {sb.append(c);}sb.append("\n");return sb;}private static void send() {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 會在連接 channel 建立成功后,會觸發(fā) active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();char c = '0';Random r = new Random();for (int i = 0; i < 10; i++) {StringBuilder sb = makeString(c, r.nextInt(256) + 1);c++;buf.writeBytes(sb.toString().getBytes());}ctx.writeAndFlush(buf);}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {worker.shutdownGracefully();}} }
    • LengthFieldBasedFrameDecoder :基于長度字段的楨解碼器
    /*** @author cristianoxiaoming@gmail.com* @version 1.0* @ClassName MyLengthFieldDecoder* @Package MyNetty.Netty.StickyUnpackingPacket* @ProjectName javaStudy* @description TODO* @date 2021-09-24 13:09* @ProductName IntelliJ IDEA*/ public class MyLengthFieldDecoder {public static void main(String[] args) {EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(//int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip1024, 0, 4, 1,4),new LoggingHandler(LogLevel.DEBUG));// 用4個字節(jié)表示內(nèi)容長度,后再接實際內(nèi)容ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();send(buffer, "Hello, world");send(buffer, "Hi!");channel.writeInbound(buffer);}private static void send(ByteBuf buffer, String content) {byte[] bytes = content.getBytes(); // 實際內(nèi)容int length = bytes.length; // 實際內(nèi)容長度buffer.writeInt(length);buffer.writeByte(1);//額外字節(jié),就要有l(wèi)engthAdjustmentbuffer.writeBytes(bytes);} }

    參考視頻
    參考文章

    總結(jié)

    以上是生活随笔為你收集整理的三、Netty的粘包半包问题解决的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。