关于Netty的ByteBuff内存泄漏问题
之前做的東華車(chē)管數(shù)據(jù)采集平臺(tái)總是發(fā)生數(shù)據(jù)丟失的情況,雖然不頻繁但是還是要關(guān)注一下原因,于是今天提高了Netty的Log級(jí)別,打算查找一下問(wèn)題出在哪了,提高級(jí)別代碼:
ServerBootstrap b =new ServerBootstrap(); b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 2048).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChildChannelHandler());將Loglevel設(shè)置成DEBUG模式就OK了。
于是開(kāi)始安心的觀察日志:
注意這句話:
LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.通過(guò)這句話我們可以得知,只要加入
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);將警告級(jí)別設(shè)置成Advaced即可查到更詳細(xì)的泄漏信息,之后再度查看日志:
2017-01-19 10:35:59 [ nioEventLoopGroup-1-0:665092 ] - [ ERROR ] LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: 5 #5:io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:435)com.dhcc.ObdServer.ObdServerHandler.channelRead(ObdServerHandler.java:31)io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) #4:Hint: 'ObdServerHandler#0' will handle the message from this point.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:387)io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:243)io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) #3:io.netty.buffer.AdvancedLeakAwareByteBuf.release(AdvancedLeakAwareByteBuf.java:721)io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:237)io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) #2:io.netty.buffer.AdvancedLeakAwareByteBuf.retain(AdvancedLeakAwareByteBuf.java:693)io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:277)io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:216)io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) #1:io.netty.buffer.AdvancedLeakAwareByteBuf.skipBytes(AdvancedLeakAwareByteBuf.java:465)io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:272)io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(DelimiterBasedFrameDecoder.java:216)io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316)io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) Created at:io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:250)io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:113)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514)io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471)io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351)io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)定位到我的代碼中為:
ByteBuf buff=(ByteBuf) msg; byte[] req=new byte[buff.readableBytes()];于是可以確定是ByteBuff內(nèi)存泄漏導(dǎo)致的問(wèn)題,于是從這方面著手調(diào)查,發(fā)現(xiàn)netty5默認(rèn)的分配bytebuff的方式是PooledByteBufAllocator,所以要手動(dòng)回收,要不然會(huì)造成內(nèi)存泄漏。
于是釋放ByteBuff即可
這里引入一個(gè)網(wǎng)友對(duì)于這行代碼的說(shuō)明:
ReferenceCountUtil.release()其實(shí)是ByteBuf.release()方法(從ReferenceCounted接口繼承而來(lái))的包裝。netty4中的ByteBuf使用了引用計(jì)數(shù)(netty4實(shí)現(xiàn)了一個(gè)可選的ByteBuf池),每一個(gè)新分配的ByteBuf>>的引用計(jì)數(shù)值為1,每對(duì)這個(gè)ByteBuf對(duì)象增加一個(gè)引用,需要調(diào)用ByteBuf.retain()方法,而每減少一個(gè)引用,需要調(diào)用ByteBuf.release()方法。當(dāng)這個(gè)ByteBuf對(duì)象的引用計(jì)數(shù)值為0時(shí),表示此對(duì)象可回收。我這只是用ByteBuf說(shuō)明,還有其他對(duì)象實(shí)現(xiàn)了ReferenceCounted接口,此時(shí)同理。
在檢查問(wèn)題的過(guò)程中,我還懷疑是不是我的Netty使用了UDP協(xié)議導(dǎo)致的數(shù)據(jù)丟失,于是這里附上Netty使用的是TCP還是UDP的判斷方法:
關(guān)于TCP和UDP
socket可以基于TCP,也可以基于UDP。區(qū)別在于UDP的不保證數(shù)據(jù)包都正確收到,所以性能更好,但容錯(cuò)不高。TCP保證不錯(cuò),所以性能沒(méi)那么好。
UDP基本只適合做在線視頻傳輸之類(lèi),我們的需求應(yīng)該會(huì)是TCP。
那這2種方式在寫(xiě)法上有什么不同?網(wǎng)上搜到這樣的說(shuō)法:
在ChannelFactory 的選擇上,UDP的通信選擇 NioDatagramChannelFactory,TCP的通信我們選擇的是NioServerSocketChannelFactory;
在Bootstrap的選擇上,UDP選擇的是ConnectionlessBootstrap,而TCP選擇的是ServerBootstrap。
對(duì)于編解碼器decoder和Encoder,以及ChannelPipelineFactory,UDP開(kāi)發(fā)與TCP并沒(méi)有什么區(qū)別,在此不做詳細(xì)介紹。
對(duì)于ChannelHandler,是UDP與TCP區(qū)別的核心所在。大家都知道UDP是無(wú)連接的,也就是說(shuō)你通過(guò) MessageEvent 參數(shù)對(duì)象的 getChannel() 方法獲取當(dāng)前會(huì)話連接,但是其 isConnected() 永遠(yuǎn)都返回 false。
UDP 開(kāi)發(fā)中在消息獲取事件回調(diào)方法中,獲取了當(dāng)前會(huì)話連接 channel 對(duì)象后可直接通過(guò) channel 的 write 方法發(fā)送數(shù)據(jù)給對(duì)端 channel.write(message, remoteAddress),第一個(gè)參數(shù)仍然是要發(fā)送的消息對(duì)象,
第二個(gè)參數(shù)則是要發(fā)送的對(duì)端 SocketAddress 地址對(duì)象。
這里最需要注意的一點(diǎn)是SocketAddress,在TCP通信中我們可以通過(guò)channel.getRemoteAddress()獲得,但在UDP通信中,我們必須從MessageEvent中通過(guò)調(diào)用getRemoteAddress()方法獲得對(duì)端的SocketAddress 地址。
?
總結(jié)
以上是生活随笔為你收集整理的关于Netty的ByteBuff内存泄漏问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: spark学习:ContextClean
- 下一篇: 如何在生产环境使用Btrace进行调试