NIO介绍与Netty通信简单入门
NIO同步阻塞與同步非阻塞
BIO與NIO
IO(BIO)和NIO區(qū)別:其本質(zhì)就是阻塞和非阻塞的區(qū)別
阻塞概念:應(yīng)用程序在獲取網(wǎng)絡(luò)數(shù)據(jù)的時(shí)候,如果網(wǎng)絡(luò)傳輸數(shù)據(jù)很慢,就會(huì)一直等待,直到傳輸完畢為止。
非阻塞概念:應(yīng)用程序直接可以獲取已經(jīng)準(zhǔn)備就緒好的數(shù)據(jù),無需等待。
IO為同步阻塞形式,NIO為同步非阻塞形式,NIO并沒有實(shí)現(xiàn)異步,在JDK1.7后升級(jí)NIO庫(kù)包,支持異步非阻塞
同學(xué)模型NIO2.0(AIO)
BIO:同步阻塞式IO,服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷,當(dāng)然可以通過線程池機(jī)制改善。?
NIO:同步非阻塞式IO,服務(wù)器實(shí)現(xiàn)模式為一個(gè)請(qǐng)求一個(gè)線程,即客戶端發(fā)送的連接請(qǐng)求都會(huì)注冊(cè)到多路復(fù)用器上,多路復(fù)用器輪詢到連接有I/O請(qǐng)求時(shí)才啟動(dòng)一個(gè)線程進(jìn)行處理。?
AIO(NIO.2):異步非阻塞式IO,服務(wù)器實(shí)現(xiàn)模式為一個(gè)有效請(qǐng)求一個(gè)線程,客戶端的I/O請(qǐng)求都是由OS先完成了再通知服務(wù)器應(yīng)用去啟動(dòng)線程進(jìn)行處理。?
?
同步時(shí),應(yīng)用程序會(huì)直接參與IO讀寫操作,并且我們的應(yīng)用程序會(huì)直接阻塞到某一個(gè)方法上,直到數(shù)據(jù)準(zhǔn)備就緒:
或者采用輪訓(xùn)的策略實(shí)時(shí)檢查數(shù)據(jù)的就緒狀態(tài),如果就緒則獲取數(shù)據(jù).
異步時(shí),則所有的IO讀寫操作交給操作系統(tǒng),與我們的應(yīng)用程序沒有直接關(guān)系,我們程序不需要關(guān)系IO讀寫,當(dāng)操作
系統(tǒng)完成了IO讀寫操作時(shí),會(huì)給我們應(yīng)用程序發(fā)送通知,我們的應(yīng)用程序直接拿走數(shù)據(jù)極即可。
偽異步
由于BIO一個(gè)客戶端需要一個(gè)線程去處理,因此我們進(jìn)行優(yōu)化,后端使用線程池來處理多個(gè)客戶端的請(qǐng)求接入,形成客戶端個(gè)數(shù)M:線程池最大的線程數(shù)N的比例關(guān)系,其中M可以遠(yuǎn)遠(yuǎn)大于N,通過線程池可以靈活的調(diào)配線程資源,設(shè)置線程的最大值,防止由于海量并發(fā)接入導(dǎo)致線程耗盡。
原理:
當(dāng)有新的客戶端接入時(shí),將客戶端的Socket封裝成一個(gè)Task(該Task任務(wù)實(shí)現(xiàn)了java的Runnable接口)投遞到后端的線程池中進(jìn)行處理,由于線程池可以設(shè)置消息隊(duì)列的大小以及線程池的最大值,因此,它的資源占用是可控的,無論多少個(gè)客戶端的并發(fā)訪問,都不會(huì)導(dǎo)致資源的耗盡或宕機(jī)。
??
使用多線程支持多個(gè)請(qǐng)求
服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷,當(dāng)然可以通過線程池機(jī)制改善
//tcp服務(wù)器端...class?TcpServer {public?static?void?main(String[]?args)?throws?IOException {System.out.println("socket tcp服務(wù)器端啟動(dòng)....");ServerSocket?serverSocket?=?new?ServerSocket(8080);//?等待客戶端請(qǐng)求try?{while?(true) {Socket?accept?=?serverSocket.accept();new?Thread(new?Runnable() {@Overridepublic?void?run() {try?{InputStream?inputStream?=?accept.getInputStream();//?轉(zhuǎn)換成string類型byte[]?buf?=?new?byte[1024];int?len?=?inputStream.read(buf);String?str?=?new?String(buf, 0,?len);System.out.println("服務(wù)器接受客戶端內(nèi)容:"?+?str);}?catch?(Exception?e) {//?TODO: handle exception}}}).start();}}?catch?(Exception?e) {e.printStackTrace();}?finally?{serverSocket.close();}}}public?class?TcpClient {public?static?void?main(String[]?args)?throws?UnknownHostException, IOException {System.out.println("socket tcp?客戶端啟動(dòng)....");Socket?socket?=?new?Socket("127.0.0.1", 8080);OutputStream?outputStream?=?socket.getOutputStream();outputStream.write("我是螞蟻課堂".getBytes());socket.close();}}?
使用線程池管理線程
//tcp服務(wù)器端...class?TcpServer {public?static?void?main(String[]?args)?throws?IOException {ExecutorService?newCachedThreadPool?= Executors.newCachedThreadPool();System.out.println("socket tcp服務(wù)器端啟動(dòng)....");ServerSocket?serverSocket?=?new?ServerSocket(8080);//?等待客戶端請(qǐng)求try?{while?(true) {Socket?accept?=?serverSocket.accept();//使用線程newCachedThreadPool.execute(new?Runnable() {@Overridepublic?void?run() {try?{InputStream?inputStream?=?accept.getInputStream();//?轉(zhuǎn)換成string類型byte[]?buf?=?new?byte[1024];int?len?=?inputStream.read(buf);String?str?=?new?String(buf, 0,?len);System.out.println("服務(wù)器接受客戶端內(nèi)容:"?+?str);}?catch?(Exception?e) {//?TODO: handle exception}}});}}?catch?(Exception?e) {e.printStackTrace();}?finally?{serverSocket.close();}}}public?class?TcpClient {public?static?void?main(String[]?args)?throws?UnknownHostException, IOException {System.out.println("socket tcp?客戶端啟動(dòng)....");Socket?socket?=?new?Socket("127.0.0.1", 8080);OutputStream?outputStream?=?socket.getOutputStream();outputStream.write("我是螞蟻課堂".getBytes());socket.close();}}??
IO模型關(guān)系
什么是阻塞
阻塞概念:應(yīng)用程序在獲取網(wǎng)絡(luò)數(shù)據(jù)的時(shí)候,如果網(wǎng)絡(luò)傳輸很慢,那么程序就一直等著,直接到傳輸完畢。
什么是非阻塞
應(yīng)用程序直接可以獲取已經(jīng)準(zhǔn)備好的數(shù)據(jù),無需等待.
IO為同步阻塞形式,NIO為同步非阻塞形式。NIO沒有實(shí)現(xiàn)異步,在JDK1.7之后,升級(jí)了NIO庫(kù)包
,支持異步費(fèi)阻塞通訊模型NIO2.0(AIO)
?
Netty快速入門
什么是Netty
?Netty?是一個(gè)基于?JAVA NIO?類庫(kù)的異步通信框架,它的架構(gòu)特點(diǎn)是:異步非阻塞、基于事件驅(qū)動(dòng)、高性能、高可靠性和高可定制性。
Netty應(yīng)用場(chǎng)景
1.分布式開源框架中dubbo、Zookeeper,RocketMQ底層rpc通訊使用就是netty。
2.游戲開發(fā)中,底層使用netty通訊。
為什么選擇netty
在本小節(jié),我們總結(jié)下為什么不建議開發(fā)者直接使用JDK的NIO類庫(kù)進(jìn)行開發(fā)的原因:
1)????? NIO的類庫(kù)和API繁雜,使用麻煩,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;
2)??????需要具備其它的額外技能做鋪墊,例如熟悉Java多線程編程,因?yàn)镹IO編程涉及到Reactor模式,你必須對(duì)多線程和網(wǎng)路編程非常熟悉,才能編寫出高質(zhì)量的NIO程序;
3)??????可靠性能力補(bǔ)齊,工作量和難度都非常大。例如客戶端面臨斷連重連、網(wǎng)絡(luò)閃斷、半包讀寫、失敗緩存、網(wǎng)絡(luò)擁塞和異常碼流的處理等等,NIO編程的特點(diǎn)是功能開發(fā)相對(duì)容易,但是可靠性能力補(bǔ)齊工作量和難度都非常大;
4)????? JDK NIO的BUG,例如臭名昭著的epoll bug,它會(huì)導(dǎo)致Selector空輪詢,最終導(dǎo)致CPU 100%。官方聲稱在JDK1.6版本的update18修復(fù)了該問題,但是直到JDK1.7版本該問題仍舊存在,只不過該bug發(fā)生概率降低了一些而已,它并沒有被根本解決。
?
Netty通信實(shí)踐
?
Netty服務(wù)器端?
package com.xiaofeng.netty.server;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author xiaofeng* @version V1.0* @title: NettyServer* @package: com.xiaofeng.netty.server* @description: netty server* @date 2019/12/17 11:24*/ public class NettyServer {private static class SingletionNettyServer {static final NettyServer instance = new NettyServer();}public static NettyServer getInstance() {return SingletionNettyServer.instance;}private EventLoopGroup mainGroup;private EventLoopGroup subGroup;private ServerBootstrap server;private ChannelFuture future;public NettyServer() {//主線程組,用于接受客戶端的連接,但是不做任何處理,跟老板一樣,不做事mainGroup = new NioEventLoopGroup();//從線程組, 老板線程組會(huì)把任務(wù)丟給他,讓手下線程組去做任務(wù)subGroup = new NioEventLoopGroup();//server啟動(dòng)類server = new ServerBootstrap();//建立聯(lián)系server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).childHandler(new NettyServerInitialzer());}public void start() {this.future = server.bind(9999);System.err.println("netty server 啟動(dòng)完畢...");}public static void main(String[] args) {NettyServer.getInstance().start();} } package com.xiaofeng.netty.server;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;/*** @author xiaofeng* @version V1.0* @title: NettyServerInitialzer* @package: com.xiaofeng.netty.server* @description: 服務(wù)端初始化器* @date 2019/12/17 11:30*/ public class NettyServerInitialzer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("stringD", new StringDecoder());pipeline.addLast("stringC", new StringEncoder());pipeline.addLast("http", new HttpClientCodec());// 自定義的handlerpipeline.addLast(new ServerChatHandler());}} package com.xiaofeng.netty.server;import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor;/*** @author xiaofeng* @version V1.0* @title: ServerChatHandler* @package: com.xiaofeng.netty.server* @description: 處理消息的handler* @date 2019/12/17 11:38*/ public class ServerChatHandler extends SimpleChannelInboundHandler<String> {// 用于記錄和管理所有客戶端的channlepublic static ChannelGroup users =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s)throws Exception {Channel channel = ctx.channel();for (Channel ch : users) {if (ch == channel) {System.out.println("收到來自客戶端[" + channel.id().asShortText() + "]的消息:" + s);ctx.writeAndFlush("[你說]:" + s + "\n");} else {ctx.writeAndFlush("[" + channel.remoteAddress() + "]" + s + "\n");}}}/*** 當(dāng)客戶端連接服務(wù)端之后(打開連接)* 獲取客戶端的channle,并且放到ChannelGroup中去進(jìn)行管理*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();System.out.println("客戶端加入,channelId為:" + channelId);users.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();System.out.println("客戶端被移除,channelId為:" + channelId);// 當(dāng)觸發(fā)handlerRemoved,ChannelGroup會(huì)自動(dòng)移除對(duì)應(yīng)客戶端的channelusers.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 發(fā)生異常之后關(guān)閉連接(關(guān)閉channel),隨后從ChannelGroup中移除ctx.channel().close();users.remove(ctx.channel());} }?
Netty客戶端
?
package com.xiaofeng.netty.client;import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader;/*** @author xiaofeng* @version V1.0* @title: NettyClient* @package: com.xiaofeng.netty.client* @description: netty client* @date 2019/12/17 11:14*/ public class NettyClient {private static class SingletionNettyClient {static final NettyClient instance = new NettyClient();}public static NettyClient getInstance() {return SingletionNettyClient.instance;}private EventLoopGroup eventGroup;private Bootstrap client;public NettyClient() {eventGroup = new NioEventLoopGroup();//創(chuàng)建客戶端啟動(dòng)類client = new Bootstrap();client.group(eventGroup).channel(NioSocketChannel.class).handler(new NettyClientInitialzer());}public void start() throws InterruptedException, IOException {Channel channel = client.connect("127.0.0.1", 9999).sync().channel();while (true) {BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));String input = reader.readLine();if (input != null) {if ("quit".equals(input)) {System.exit(1);}channel.writeAndFlush(input);}}}public static void main(String[] args) throws InterruptedException, IOException {NettyClient.getInstance().start();} }?
package com.xiaofeng.netty.client;import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;/*** @author xiaofeng* @version V1.0* @title: NettyClientInitialzer* @package: com.xiaofeng.netty.client* @description: 客戶端初始化器* @date 2019/12/17 11:29*/ public class NettyClientInitialzer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {//獲取管道ChannelPipeline pipeline = channel.pipeline();//添加編解碼器pipeline.addLast("stringD", new StringDecoder());pipeline.addLast("stringC", new StringEncoder());pipeline.addLast("http", new HttpClientCodec());// 自定義的handlerpipeline.addLast(new ClientChatHandler());} } package com.xiaofeng.netty.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;/*** @author xiaofeng* @version V1.0* @title: ClientChatHandler* @package: com.xiaofeng.netty.client* @description: 處理消息的handler* @date 2019/12/17 11:38*/ public class ClientChatHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s)throws Exception {System.out.println(s);}}TCP粘包、拆包問題解決方案
什么是粘包/拆包
???一個(gè)完整的業(yè)務(wù)可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這個(gè)就是TCP的拆包和封包問題。
下面可以看一張圖,是客戶端向服務(wù)端發(fā)送包:
1.?第一種情況,Data1和Data2都分開發(fā)送到了Server端,沒有產(chǎn)生粘包和拆包的情況。
2.?第二種情況,Data1和Data2數(shù)據(jù)粘在了一起,打成了一個(gè)大的包發(fā)送到Server端,這個(gè)情況就是粘包。
3.?第三種情況,Data2被分離成Data2_1和Data2_2,并且Data2_1在Data1之前到達(dá)了服務(wù)端,這種情況就產(chǎn)生了拆包。
由于網(wǎng)絡(luò)的復(fù)雜性,可能數(shù)據(jù)會(huì)被分離成N多個(gè)復(fù)雜的拆包/粘包的情況,所以在做TCP服務(wù)器的時(shí)候就需要首先解決拆包/
解決辦法
? ? ?1.消息定長(zhǎng),報(bào)文大小固定長(zhǎng)度,不夠空格補(bǔ)全,發(fā)送和接收方遵循相同的約定,這樣即使粘包了通過接收方編程實(shí)現(xiàn)獲取定長(zhǎng)報(bào)文也能區(qū)分。
sc.pipeline().addLast(new FixedLengthFrameDecoder(10));2.包尾添加特殊分隔符,例如每條報(bào)文結(jié)束都添加回車換行符(例如FTP協(xié)議)或者指定特殊字符作為報(bào)文分隔符,接收方通過特殊分隔符切分報(bào)文區(qū)分。
ByteBuf?buf?= Unpooled.copiedBuffer("_mayi".getBytes());sc.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,?buf));將消息分為消息頭和消息體,消息頭中包含表示信息的總長(zhǎng)度(或者消息體長(zhǎng)度)的字段?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的NIO介绍与Netty通信简单入门的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 消息称华为车 BU 王军被停职,余承东独
- 下一篇: java垃圾回收机制算法分析