生活随笔
收集整理的這篇文章主要介紹了
三、Netty的粘包半包问题解决
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
一、定義
TCP 傳輸中,客戶端發(fā)送數(shù)據(jù),實際是把數(shù)據(jù)寫入到了 TCP 的緩存中,粘包和半包也就會在此時產(chǎn)生。客戶端給服務(wù)端發(fā)送了兩條消息ABC和DEF,服務(wù)端這邊的接收會有多少種情況呢?有可能是一次性收到了所有的消息ABCDEF,有可能是收到了三條消息AB、CD、EF。
粘包
上面所說的一次性收到了所有的消息ABCDEF,類似于粘包。如果客戶端發(fā)送的包的大小比 TCP 的緩存容量小,并且 TCP 緩存可以存放多個包,那么客戶端和服務(wù)端的一次通信就可能傳遞了多個包,這時候服務(wù)端從 TCP 緩存就可能一下讀取了多個包,這種現(xiàn)象就叫粘包。
半包
上面說的后面那種收到了三條消息AB、CD、EF,類似于半包。如果客戶端發(fā)送的包的大小比 TCP 的緩存容量大,那么這個數(shù)據(jù)包就會被分成多個包,通過 Socket 多次發(fā)送到服務(wù)端,服務(wù)端第一次從接受緩存里面獲取的數(shù)據(jù),實際是整個包的一部分,這時候就產(chǎn)生了半包(半包不是說只收到了全包的一半,是說收到了全包的一部分)。
二、產(chǎn)生原因
其實從上面的定義,我們就可以大概知道產(chǎn)生的原因了。
粘包的主要原因:
- 發(fā)送方每次寫入數(shù)據(jù) < 套接字(Socket)緩沖區(qū)大小
- 接收方讀取套接字(Socket)緩沖區(qū)數(shù)據(jù)不夠及時
半包的主要原因:
- 發(fā)送方每次寫入數(shù)據(jù) > 套接字(Socket)緩沖區(qū)大小
- 發(fā)送的數(shù)據(jù)大于協(xié)議的 MTU (Maximum Transmission Unit,最大傳輸單元),因此必須拆包
其實我們可以換個角度看待問題:
- 從收發(fā)的角度看,便是一個發(fā)送可能被多次接收,多個發(fā)送可能被一次接收。
- 從傳輸?shù)慕嵌瓤?#xff0c;便是一個發(fā)送可能占用多個傳輸包,多個發(fā)送可能共用一個傳輸包。
根本原因,其實是
- TCP 是流式協(xié)議,消息無邊界。
- (PS : UDP雖然也可以一次傳輸多個包或者多次傳輸一個包,但每個消息都是有邊界的,因此不會有粘包和半包問題。)
三、現(xiàn)象分析
TCP 以一個段(segment)為單位,每發(fā)送一個段就需要進(jìn)行一次確認(rèn)應(yīng)答(ack)處理,但如果這么做,缺點是包的往返時間越長性能就越差
為了解決此問題,引入了窗口概念,窗口大小即決定了無需等待應(yīng)答而可以繼續(xù)發(fā)送的數(shù)據(jù)最大值
- 窗口實際就起到一個緩沖區(qū)的作用,同時也能起到流量控制的作用
- 圖中深色的部分即要發(fā)送的數(shù)據(jù),高亮的部分即窗口
- 窗口內(nèi)的數(shù)據(jù)才允許被發(fā)送,當(dāng)應(yīng)答未到達(dá)前,窗口必須停止滑動
- 如果 1001~2000 這個段的數(shù)據(jù) ack 回來了,窗口就可以向前滑動
- 接收方也會維護一個窗口,只有落在窗口內(nèi)的數(shù)據(jù)才能允許接收
- 鏈路層對一次能夠發(fā)送的最大數(shù)據(jù)有限制,這個限制稱之為 MTU(maximum transmission unit),不同的鏈路設(shè)備的 MTU 值也有所不同,例如
- 以太網(wǎng)的 MTU 是 1500
- FDDI(光纖分布式數(shù)據(jù)接口)的 MTU 是 4352
- 本地回環(huán)地址的 MTU 是 65535 - 本地測試不走網(wǎng)卡
- MSS 是最大段長度(maximum segment size),它是 MTU 刨去 tcp 頭和 ip 頭后剩余能夠作為數(shù)據(jù)傳輸?shù)淖止?jié)數(shù)
- ipv4 tcp 頭占用 20 bytes,ip 頭占用 20 bytes,因此以太網(wǎng) MSS 的值為 1500 - 40 = 1460
- TCP 在傳遞大量數(shù)據(jù)時,會按照 MSS 大小將數(shù)據(jù)進(jìn)行分割發(fā)送
- MSS 的值在三次握手時通知對方自己 MSS 的值,然后在兩者之間選擇一個小值作為 MSS
- 即使發(fā)送一個字節(jié),也需要加入 tcp 頭和 ip 頭,也就是總字節(jié)數(shù)會使用 41 bytes,非常不經(jīng)濟。因此為了提高網(wǎng)絡(luò)利用率,tcp 希望盡可能發(fā)送足夠大的數(shù)據(jù),這就是 Nagle 算法產(chǎn)生的緣由
- 該算法是指發(fā)送端即使還有應(yīng)該發(fā)送的數(shù)據(jù),但如果這部分?jǐn)?shù)據(jù)很少的話,則進(jìn)行延遲發(fā)送
- 如果 SO_SNDBUF 的數(shù)據(jù)達(dá)到 MSS,則需要發(fā)送
- 如果 SO_SNDBUF 中含有 FIN(表示需要連接關(guān)閉)這時將剩余數(shù)據(jù)發(fā)送,再關(guān)閉
- 如果 TCP_NODELAY = true,則需要發(fā)送
- 已發(fā)送的數(shù)據(jù)都收到 ack 時,則需要發(fā)送
- 上述條件不滿足,但發(fā)生超時(一般為 200ms)則需要發(fā)送
- 除上述情況,延遲發(fā)送
四、粘包半包代碼
粘包
public class HelloWorldServer {static final Logger log
= LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss
= new NioEventLoopGroup(1);NioEventLoopGroup worker
= new NioEventLoopGroup();try {ServerBootstrap serverBootstrap
= new ServerBootstrap();serverBootstrap
.channel(NioServerSocketChannel.class);serverBootstrap
.group(boss
, worker
);serverBootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) throws Exception {log
.debug("connected {}", ctx
.channel());super.channelActive(ctx
);}@Overridepublic void channelInactive(ChannelHandlerContext ctx
) throws Exception {log
.debug("disconnect {}", ctx
.channel());super.channelInactive(ctx
);}});}});ChannelFuture channelFuture
= serverBootstrap
.bind(8080);log
.debug("{} binding...", channelFuture
.channel());channelFuture
.sync();log
.debug("{} bound...", channelFuture
.channel());channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("server error", e
);} finally {boss
.shutdownGracefully();worker
.shutdownGracefully();log
.debug("stoped");}}public static void main(String[] args
) {new HelloWorldServer().start();}
}
- client端:希望發(fā)送 10 個消息,每個消息是 16 字節(jié)
public class HelloWorldClient {static final Logger log
= LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args
) {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {log
.debug("connected...");ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) throws Exception {log
.debug("sending...");for (int i
= 0; i
< 10; i
++) {ByteBuf buffer
= ctx
.alloc().buffer();buffer
.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx
.writeAndFlush(buffer
);}}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
可以看到server一次就接收了 160 個字節(jié),而非分 10 次接收。
半包
- client :客戶端代碼發(fā)送 1 個消息,這個消息是 160 字節(jié)
public class HelloWorldClient {static final Logger log
= LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args
) {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {log
.debug("connected...");ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) throws Exception {log
.debug("sending...");ByteBuf buffer
= ctx
.alloc().buffer();for (int i
= 0; i
< 10; i
++) {buffer
.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});}ctx
.writeAndFlush(buffer
);}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
為現(xiàn)象明顯,服務(wù)端修改一下接收緩沖區(qū),其它代碼不變
serverBootstrap
.option(ChannelOption.SO_RCVBUF
, 10);
可以看到接收的消息被分為兩節(jié),第一次 20 字節(jié),第二次 140 字節(jié)(半包)
五、解決方案
將 TCP 連接改成短連接,一個請求一個短連接。這樣的話,建立連接到釋放連接之間的消息即為傳輸?shù)男畔?#xff0c;消息也就產(chǎn)生了邊界。這樣的方法就是十分簡單,不需要在我們的應(yīng)用中做過多修改。但缺點也就很明顯了,效率低下,TCP連接和斷開都會涉及三次握手以及四次握手,每個消息都會涉及這些過程,十分浪費性能。 因此,并不推介這種方式。
- 封裝成幀
封裝成幀(Framing),也就是原本發(fā)送消息的單位是緩沖大小,現(xiàn)在換成了幀,這樣我們就可以自定義邊界了。一般有4種方式:
固定長度 這種方式下,消息邊界也就是固定長度即可。 優(yōu)點就是實現(xiàn)很簡單,缺點就是空間有極大的浪費,如果傳遞的消息中大部分都比較短,這樣就會有很多空間是浪費的。 因此,這種方式一般也是不推介的。分隔符 這種方式下,消息邊界也就是分隔符本身。優(yōu)點是空間不再浪費,實現(xiàn)也比較簡單。缺點是當(dāng)內(nèi)容本身出現(xiàn)分割符時需要轉(zhuǎn)義,所以無論是發(fā)送還是接受,都需要進(jìn)行整個內(nèi)容的掃描。 因此,這種方式效率也不是很高,但可以嘗試使用。專門的 length 字段 這種方式,就有點類似 Http 請求中的 Content-Length,有一個專門的字段存儲消息的長度。作為服務(wù)端,接受消息時,先解析固定長度的字段(length字段)獲取消息總長度,然后讀取后續(xù)內(nèi)容。優(yōu)點是精確定位用戶數(shù)據(jù),內(nèi)容也不用轉(zhuǎn)義。缺點是長度理論上有限制,需要提前限制可能的最大長度從而定義長度占用字節(jié)數(shù)。 因此,十分推介用這種方式。
六、Netty 中的實現(xiàn)
Netty 支持上文所講的封裝成幀(Framing)中的前三種方式,簡單介紹下:
方式解碼編碼
| 固定長度 | FixedLengthFrameDecoder | 簡單(不內(nèi)置) |
| 分割符 | DelimiterBasedFrameDecoder (需指定最大長度和自定義Bytebuf類型的分隔符) | 簡單(不內(nèi)置) |
| 專門的 length 字段 | LengthFieldBasedFrameDecoder | LengthFieldPrepender |
@Slf4j
public class MyFixedLengthFrameClient {public static void main(String[] args
) {send();System.out
.println("finish");}public static byte[] fill10Bytes(char c
, int len
) {byte[] bytes
= new byte[10];Arrays.fill(bytes
, (byte) '_');for (int i
= 0; i
< len
; i
++) {bytes
[i
] = (byte) c
;}System.out
.println(new String(bytes
));return bytes
;}private static void send() {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) {ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) {ByteBuf buf
= ctx
.alloc().buffer();char c
= '0';Random r
= new Random();for (int i
= 0; i
< 10; i
++) {byte[] bytes
= fill10Bytes(c
, r
.nextInt(10) + 1);c
++;buf
.writeBytes(bytes
);}ctx
.writeAndFlush(buf
);}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
@Slf4j
public class MyFixedLengthFrameServer {void start() {NioEventLoopGroup boss
= new NioEventLoopGroup();NioEventLoopGroup worker
= new NioEventLoopGroup();try {ServerBootstrap serverBootstrap
= new ServerBootstrap();serverBootstrap
.channel(NioServerSocketChannel.class);
serverBootstrap
.childOption(ChannelOption.RCVBUF_ALLOCATOR
, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap
.group(boss
, worker
);serverBootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {ch
.pipeline().addLast(new FixedLengthFrameDecoder(10));ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));}});ChannelFuture channelFuture
= serverBootstrap
.bind(8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("server error", e
);} finally {boss
.shutdownGracefully();worker
.shutdownGracefully();}}public static void main(String[] args
) {new MyFixedLengthFrameServer().start();}
}
可以看到,客戶端一次性將10條信息100字節(jié)一次性發(fā)出去,這里已經(jīng)發(fā)生了粘包。
但是可以看到,server端已經(jīng)根據(jù)設(shè)置的framLength=10進(jìn)行分包了,接收的時候是正常的。
- DelimiterBasedFrameDecoder
@Slf4j
public class MyDelimiterBasedFrameDecoderServer {void start() {NioEventLoopGroup boss
= new NioEventLoopGroup();NioEventLoopGroup worker
= new NioEventLoopGroup();try {ServerBootstrap serverBootstrap
= new ServerBootstrap();serverBootstrap
.channel(NioServerSocketChannel.class);
serverBootstrap
.childOption(ChannelOption.RCVBUF_ALLOCATOR
, new AdaptiveRecvByteBufAllocator(16, 16, 16));serverBootstrap
.group(boss
, worker
);serverBootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) throws Exception {ch
.pipeline().addLast(new LineBasedFrameDecoder(1024));ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));}});ChannelFuture channelFuture
= serverBootstrap
.bind(8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("server error", e
);} finally {boss
.shutdownGracefully();worker
.shutdownGracefully();}}public static void main(String[] args
) {new MyDelimiterBasedFrameDecoderServer().start();}
}
@Slf4j
public class MyDelimiterBasedFrameDecoderClient {public static void main(String[] args
) {send();System.out
.println("finish");}public static StringBuilder makeString(char c
, int len
) {StringBuilder sb
= new StringBuilder(len
+ 2);for (int i
= 0; i
< len
; i
++) {sb
.append(c
);}sb
.append("\n");return sb
;}private static void send() {NioEventLoopGroup worker
= new NioEventLoopGroup();try {Bootstrap bootstrap
= new Bootstrap();bootstrap
.channel(NioSocketChannel.class);bootstrap
.group(worker
);bootstrap
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch
) {ch
.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG
));ch
.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx
) {ByteBuf buf
= ctx
.alloc().buffer();char c
= '0';Random r
= new Random();for (int i
= 0; i
< 10; i
++) {StringBuilder sb
= makeString(c
, r
.nextInt(256) + 1);c
++;buf
.writeBytes(sb
.toString().getBytes());}ctx
.writeAndFlush(buf
);}});}});ChannelFuture channelFuture
= bootstrap
.connect("127.0.0.1", 8080).sync();channelFuture
.channel().closeFuture().sync();} catch (InterruptedException e
) {log
.error("client error", e
);} finally {worker
.shutdownGracefully();}}
}
- LengthFieldBasedFrameDecoder :基于長度字段的楨解碼器
public class MyLengthFieldDecoder {public static void main(String[] args
) {EmbeddedChannel channel
= new EmbeddedChannel(new LengthFieldBasedFrameDecoder(1024, 0, 4, 1,4),new LoggingHandler(LogLevel.DEBUG
));ByteBuf buffer
= ByteBufAllocator.DEFAULT
.buffer();send(buffer
, "Hello, world");send(buffer
, "Hi!");channel
.writeInbound(buffer
);}private static void send(ByteBuf buffer
, String content
) {byte[] bytes
= content
.getBytes(); int length
= bytes
.length
; buffer
.writeInt(length
);buffer
.writeByte(1);buffer
.writeBytes(bytes
);}
}
參考視頻
參考文章
總結(jié)
以上是生活随笔為你收集整理的三、Netty的粘包半包问题解决的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。