生活随笔
收集整理的這篇文章主要介紹了
三、Netty的粘包半包问题解决
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、定義
TCP 傳輸中,客戶端發送數據,實際是把數據寫入到了 TCP 的緩存中,粘包和半包也就會在此時產生。客戶端給服務端發送了兩條消息ABC和DEF,服務端這邊的接收會有多少種情況呢?有可能是一次性收到了所有的消息ABCDEF,有可能是收到了三條消息AB、CD、EF。
粘包
上面所說的一次性收到了所有的消息ABCDEF,類似于粘包。如果客戶端發送的包的大小比 TCP 的緩存容量小,并且 TCP 緩存可以存放多個包,那么客戶端和服務端的一次通信就可能傳遞了多個包,這時候服務端從 TCP 緩存就可能一下讀取了多個包,這種現象就叫粘包。
半包
上面說的后面那種收到了三條消息AB、CD、EF,類似于半包。如果客戶端發送的包的大小比 TCP 的緩存容量大,那么這個數據包就會被分成多個包,通過 Socket 多次發送到服務端,服務端第一次從接受緩存里面獲取的數據,實際是整個包的一部分,這時候就產生了半包(半包不是說只收到了全包的一半,是說收到了全包的一部分)。
二、產生原因
其實從上面的定義,我們就可以大概知道產生的原因了。
粘包的主要原因:
- 發送方每次寫入數據 < 套接字(Socket)緩沖區大小
- 接收方讀取套接字(Socket)緩沖區數據不夠及時
半包的主要原因:
- 發送方每次寫入數據 > 套接字(Socket)緩沖區大小
- 發送的數據大于協議的 MTU (Maximum Transmission Unit,最大傳輸單元),因此必須拆包
其實我們可以換個角度看待問題:
- 從收發的角度看,便是一個發送可能被多次接收,多個發送可能被一次接收。
- 從傳輸的角度看,便是一個發送可能占用多個傳輸包,多個發送可能共用一個傳輸包。
根本原因,其實是
- TCP 是流式協議,消息無邊界。
- (PS : UDP雖然也可以一次傳輸多個包或者多次傳輸一個包,但每個消息都是有邊界的,因此不會有粘包和半包問題。)
三、現象分析
TCP 以一個段(segment)為單位,每發送一個段就需要進行一次確認應答(ack)處理,但如果這么做,缺點是包的往返時間越長性能就越差
為了解決此問題,引入了窗口概念,窗口大小即決定了無需等待應答而可以繼續發送的數據最大值
- 窗口實際就起到一個緩沖區的作用,同時也能起到流量控制的作用
- 圖中深色的部分即要發送的數據,高亮的部分即窗口
- 窗口內的數據才允許被發送,當應答未到達前,窗口必須停止滑動
- 如果 1001~2000 這個段的數據 ack 回來了,窗口就可以向前滑動
- 接收方也會維護一個窗口,只有落在窗口內的數據才能允許接收
- 鏈路層對一次能夠發送的最大數據有限制,這個限制稱之為 MTU(maximum transmission unit),不同的鏈路設備的 MTU 值也有所不同,例如
- 以太網的 MTU 是 1500
- FDDI(光纖分布式數據接口)的 MTU 是 4352
- 本地回環地址的 MTU 是 65535 - 本地測試不走網卡
- MSS 是最大段長度(maximum segment size),它是 MTU 刨去 tcp 頭和 ip 頭后剩余能夠作為數據傳輸的字節數
- ipv4 tcp 頭占用 20 bytes,ip 頭占用 20 bytes,因此以太網 MSS 的值為 1500 - 40 = 1460
- TCP 在傳遞大量數據時,會按照 MSS 大小將數據進行分割發送
- MSS 的值在三次握手時通知對方自己 MSS 的值,然后在兩者之間選擇一個小值作為 MSS
- 即使發送一個字節,也需要加入 tcp 頭和 ip 頭,也就是總字節數會使用 41 bytes,非常不經濟。因此為了提高網絡利用率,tcp 希望盡可能發送足夠大的數據,這就是 Nagle 算法產生的緣由
- 該算法是指發送端即使還有應該發送的數據,但如果這部分數據很少的話,則進行延遲發送
- 如果 SO_SNDBUF 的數據達到 MSS,則需要發送
- 如果 SO_SNDBUF 中含有 FIN(表示需要連接關閉)這時將剩余數據發送,再關閉
- 如果 TCP_NODELAY = true,則需要發送
- 已發送的數據都收到 ack 時,則需要發送
- 上述條件不滿足,但發生超時(一般為 200ms)則需要發送
- 除上述情況,延遲發送
四、粘包半包代碼
粘包
public class HelloWorldServer {static final Logger log
= LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss
= new NioEventLoopGroup(1);NioEventLoopGroup worker
= new NioEventLoopGroup();try {ServerBootstrap serverBootstrap
= new ServerBootstrap();serverBootstrap
.channel(NioServerSocketChannel.class);serverBootstrap
.group(boss
, worker
);serverBootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) throws Exception {log
.debug("connected {}", ctx
.channel());super.channelActive(ctx
);}@Overridepublic void channelInactive(ChannelHandlerContext ctx
) throws Exception {log
.debug("disconnect {}", ctx
.channel());super.channelInactive(ctx
);}});}});ChannelFuture channelFuture
= serverBootstrap
.bind(8080);log
.debug("{} binding...", channelFuture
.channel());channelFuture
.sync();log
.debug("{} bound...", channelFuture
.channel());channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("server error", e
);} finally {boss
.shutdownGracefully();worker
.shutdownGracefully();log
.debug("stoped");}}public static void main(String[] args
) {new HelloWorldServer().start();}
}
- client端:希望發送 10 個消息,每個消息是 16 字節
public class HelloWorldClient {static final Logger log
= LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args
) {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {log
.debug("connected...");ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) throws Exception {log
.debug("sending...");for (int i
= 0; i
< 10; i
++) {ByteBuf buffer
= ctx
.alloc().buffer();buffer
.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx
.writeAndFlush(buffer
);}}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
可以看到server一次就接收了 160 個字節,而非分 10 次接收。
半包
- client :客戶端代碼發送 1 個消息,這個消息是 160 字節
public class HelloWorldClient {static final Logger log
= LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args
) {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {log
.debug("connected...");ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) throws Exception {log
.debug("sending...");ByteBuf buffer
= ctx
.alloc().buffer();for (int i
= 0; i
< 10; i
++) {buffer
.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});}ctx
.writeAndFlush(buffer
);}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
為現象明顯,服務端修改一下接收緩沖區,其它代碼不變
serverBootstrap
.option(ChannelOption.SO_RCVBUF
, 10);
可以看到接收的消息被分為兩節,第一次 20 字節,第二次 140 字節(半包)
五、解決方案
將 TCP 連接改成短連接,一個請求一個短連接。這樣的話,建立連接到釋放連接之間的消息即為傳輸的信息,消息也就產生了邊界。這樣的方法就是十分簡單,不需要在我們的應用中做過多修改。但缺點也就很明顯了,效率低下,TCP連接和斷開都會涉及三次握手以及四次握手,每個消息都會涉及這些過程,十分浪費性能。 因此,并不推介這種方式。
- 封裝成幀
封裝成幀(Framing),也就是原本發送消息的單位是緩沖大小,現在換成了幀,這樣我們就可以自定義邊界了。一般有4種方式:
固定長度 這種方式下,消息邊界也就是固定長度即可。 優點就是實現很簡單,缺點就是空間有極大的浪費,如果傳遞的消息中大部分都比較短,這樣就會有很多空間是浪費的。 因此,這種方式一般也是不推介的。分隔符 這種方式下,消息邊界也就是分隔符本身。優點是空間不再浪費,實現也比較簡單。缺點是當內容本身出現分割符時需要轉義,所以無論是發送還是接受,都需要進行整個內容的掃描。 因此,這種方式效率也不是很高,但可以嘗試使用。專門的 length 字段 這種方式,就有點類似 Http 請求中的 Content-Length,有一個專門的字段存儲消息的長度。作為服務端,接受消息時,先解析固定長度的字段(length字段)獲取消息總長度,然后讀取后續內容。優點是精確定位用戶數據,內容也不用轉義。缺點是長度理論上有限制,需要提前限制可能的最大長度從而定義長度占用字節數。 因此,十分推介用這種方式。
六、Netty 中的實現
Netty 支持上文所講的封裝成幀(Framing)中的前三種方式,簡單介紹下:
方式解碼編碼
| 固定長度 | FixedLengthFrameDecoder | 簡單(不內置) |
| 分割符 | DelimiterBasedFrameDecoder (需指定最大長度和自定義Bytebuf類型的分隔符) | 簡單(不內置) |
| 專門的 length 字段 | LengthFieldBasedFrameDecoder | LengthFieldPrepender |
@Slf4j
public class MyFixedLengthFrameClient {public static void main(String[] args
) {send();System.out
.println("finish");}public static byte[] fill10Bytes(char c
, int len
) {byte[] bytes
= new byte[10];Arrays.fill(bytes
, (byte) '_');for (int i
= 0; i
< len
; i
++) {bytes
[i
] = (byte) c
;}System.out
.println(new String(bytes
));return bytes
;}private static void send() {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) {ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) {ByteBuf buf
= ctx
.alloc().buffer();char c
= '0';Random r
= new Random();for (int i
= 0; i
< 10; i
++) {byte[] bytes
= fill10Bytes(c
, r
.nextInt(10) + 1);c
++;buf
.writeBytes(bytes
);}ctx
.writeAndFlush(buf
);}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
@Slf4j
public class MyFixedLengthFrameServer {void start() {NioEventLoopGroup boss
= new NioEventLoopGroup();NioEventLoopGroup worker
= new NioEventLoopGroup();try {ServerBootstrap serverBootstrap
= new ServerBootstrap();serverBootstrap
.channel(NioServerSocketChannel.class);
serverBootstrap
.childOption(ChannelOption.RCVBUF_ALLOCATOR
, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap
.group(boss
, worker
);serverBootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {ch
.pipeline().addLast(new FixedLengthFrameDecoder(10));ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));}});ChannelFuture channelFuture
= serverBootstrap
.bind(8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("server error", e
);} finally {boss
.shutdownGracefully();worker
.shutdownGracefully();}}public static void main(String[] args
) {new MyFixedLengthFrameServer().start();}
}
可以看到,客戶端一次性將10條信息100字節一次性發出去,這里已經發生了粘包。
但是可以看到,server端已經根據設置的framLength=10進行分包了,接收的時候是正常的。
- DelimiterBasedFrameDecoder
@Slf4j
public class MyDelimiterBasedFrameDecoderServer {void start() {NioEventLoopGroup boss
= new NioEventLoopGroup();NioEventLoopGroup worker
= new NioEventLoopGroup();try {ServerBootstrap serverBootstrap
= new ServerBootstrap();serverBootstrap
.channel(NioServerSocketChannel.class);
serverBootstrap
.childOption(ChannelOption.RCVBUF_ALLOCATOR
, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap
.group(boss
, worker
);serverBootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {ch
.pipeline().addLast(new LineBasedFrameDecoder(1024));ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));}});ChannelFuture channelFuture
= serverBootstrap
.bind(8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("server error", e
);} finally {boss
.shutdownGracefully();worker
.shutdownGracefully();}}public static void main(String[] args
) {new MyDelimiterBasedFrameDecoderServer().start();}
}
@Slf4j
public class MyDelimiterBasedFrameDecoderClient {public static void main(String[] args
) {send();System.out
.println("finish");}public static StringBuilder makeString(char c
, int len
) {StringBuilder sb
= new StringBuilder(len
+ 2);for (int i
= 0; i
< len
; i
++) {sb
.append(c
);}sb
.append("\n");return sb
;}private static void send() {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) {ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) {ByteBuf buf
= ctx
.alloc().buffer();char c
= '0';Random r
= new Random();for (int i
= 0; i
< 10; i
++) {StringBuilder sb
= makeString(c
, r
.nextInt(256) + 1);c
++;buf
.writeBytes(sb
.toString().getBytes());}ctx
.writeAndFlush(buf
);}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
- LengthFieldBasedFrameDecoder :基于長度字段的楨解碼器
public class MyLengthFieldDecoder {public static void main(String[] args
) {EmbeddedChannel channel
= new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024, 0, 4, 1,4),new LoggingHandler(LogLevel.DEBUG
));ByteBuf buffer
= ByteBufAllocator.DEFAULT
.buffer();send(buffer
, "Hello, world");send(buffer
, "Hi!");channel
.writeInbound(buffer
);}private static void send(ByteBuf buffer
, String content
) {byte[] bytes
= content
.getBytes(); int length
= bytes
.length
; buffer
.writeInt(length
);buffer
.writeByte(1);buffer
.writeBytes(bytes
);}
}
參考視頻
參考文章
總結
以上是生活随笔為你收集整理的三、Netty的粘包半包问题解决的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。