NIO详解(四):NIO编程
1. NIO類庫簡介
1.1 緩沖區(qū)Buffer
Buffer是一個對象,它包含了一些要寫入或者要讀出的數(shù)據。在NIO類庫中加入Buffer對象,體現(xiàn)了新庫和原來I/O的一個重要區(qū)別。在NIO庫中,所有的數(shù)據都是用緩沖區(qū)處理的。在讀取數(shù)據時,它是直接讀取到緩沖區(qū)中的;在寫入到緩沖區(qū)時。任何時候訪問NIO中的數(shù)據,都是通過緩沖區(qū)進行操作。
緩沖區(qū)實質上是一個數(shù)組。通常它是一個字節(jié)數(shù)據(ByteBuffer),也可以使用其他種類的數(shù)組。但是一個緩沖區(qū)不僅僅是一個數(shù)組,緩沖區(qū)還提供了對數(shù)據的結構化訪問以及維護讀寫位置(limit)等信息。
- ByteBuffer:字節(jié)緩沖區(qū)
- CharBuffer:字符緩沖區(qū)
- ShortBuffer:短整形緩沖區(qū)
- IntBuffer:整形緩沖區(qū)
- LongBuffer:長整形緩沖區(qū)
- FloatBuffer:浮點型緩沖區(qū)
- DoubleBuffer:雙精度浮點型緩沖區(qū)
緩沖區(qū)的繼承關系如下:
1.2 通道Channel
Channel是一個通道,它就像自來水管道一樣,網絡數(shù)據通過Channel讀取和寫入。通道與流不同之處在于通道它是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutputStream的子類),而通道可以用于讀、寫或者二者同時進行。因為Channel是全雙工的,所以它可以比流更加映射底層操作系統(tǒng)地API。從類圖中可以看出,實際上Channel可以分為兩大類:用于網絡讀寫的SelectabaleChannel和用于文件操作的FileChannel。ServerSocketChannel是一個可以監(jiān)聽新進來的TCP連接的通道,就像標準IO中的ServerSocket一樣。
1.3 多路復用器Selector
Select會不斷地輪詢注冊在其上的Channel,如果某個Channel上面發(fā)生讀或者寫事件,這個Channel就處于就緒狀態(tài),會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel集合,進行后續(xù)的I/O操作。一個多路復用器Selector可以同時輪詢多個Channel,由于JDK使用了epoll()代替?zhèn)鹘y(tǒng)的select實現(xiàn),所以它并沒有最大連接句柄1024/2048的輪詢,就可以接入成千上萬的客戶端。
2. NIO服務端序列圖
一。 打開ServerSocketChannel,用于監(jiān)聽客戶端的連接。
ServerSocketChannel servChannel=ServerSocketChannel.open();二。綁定監(jiān)聽端口,設置連接為非阻塞狀態(tài)。
servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024);三。創(chuàng)建Reactor線程,創(chuàng)建多路復用器并啟動線程。
Selector selector = Selector.open();四。將ServerSocketChannel注冊到Reactor線程的多路復用器Selector上,監(jiān)聽ACCEPT事件
servChannel.register(selector, SelectionKey.OP_ACCEPT);五。多路復用器在線程run方法的無線循環(huán)體內輪詢準備就緒的Key。
while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}六。多路復用器監(jiān)聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路。
if (key.isAcceptable()) {// Accept the new connectionServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept(); }七。設置客戶端鏈路為非阻塞模式
sc.configureBlocking(false);八。將接入的客戶端連接注冊到Reactor線程的多路復用器上,監(jiān)聽讀操作,讀取客戶端發(fā)送的網絡消息。
sc.register(selector, SelectionKey.OP_READ);九。異步讀取客戶端請求消息到緩沖區(qū)。
if (key.isReadable()) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);.....}十。對ByteBuffer進行編碼解碼,如果有半包消息指針reset,繼續(xù)讀取后續(xù)的報文,將解碼成功的消息封裝成Task,投遞到業(yè)務線程池中,進行業(yè)務邏輯編排。
Object message=null; while(buffer.hasRemain()){bytebuffer.mark();Object message=decode(byteBuffer);if(message==null){byteBufer.reset();break;}messageList.add(message); } if(!bytebuffer.hasRemain()){byteBuffer.clear(); }elsebyteBuffer.compact();if(messageList!=null & !messageList.isEmpty()){for(Obbject messageE:messagList){handlerTask(messageE)} }十一。將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發(fā)送給客戶端。
socketChannel.write(buffer).3. NIO客戶端序列圖
一。打開SocketChannel,綁定客戶端本地地址。
SocketChannel clientChannel = SocketChannel.open();二。設置SocketChannel為非阻塞模式,同時設置客戶端連接的TCP參數(shù)。
socketChannel.configureBlocking(false); socketChannel.socket().setReuseAddress(true);三。異步連接服務器。判斷是否連接成功,如果連接成功,則直接注冊讀取狀態(tài)到多路復用器中,如果當前沒有連接成功,則向Reactor的多路復用器注冊OP_CONNECT狀態(tài)為,監(jiān)聽服務器端的TCP ACK應答。
// 如果直接連接成功,則注冊到多路復用器上,發(fā)送請求消息,讀應答if (socketChannel.connect(new InetSocketAddress(host, port))) {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel);} elsesocketChannel.register(selector, SelectionKey.OP_CONNECT);四。創(chuàng)建Reactor線程,創(chuàng)建多路復用器并啟動線程。
Selector selector = Selector.open();五。多路復用器在線程run方法的無線循環(huán)體內輪詢準備就緒的Key。
while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}六。接受connect事件處理。判斷連接結果,如果連接成功,注冊連接事件到多路復用器。注冊讀事件到多路復用器中。 ```java if (key.isConnectable()) {if (sc.finishConnect()) {sc.register(selector, SelectionKey.OP_READ);doWrite(sc);} elseSystem.exit(1);// 連接失敗,進程退出}七。異步讀取客戶端請求消息到緩沖區(qū)。
if (key.isReadable()) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);.....}八。對ByteBuffer進行編碼解碼,如果有半包消息指針reset,繼續(xù)讀取后續(xù)的報文,將解碼成功的消息封裝成Task,投遞到業(yè)務線程池中,進行業(yè)務邏輯編排。
Object message=null; while(buffer.hasRemain()){bytebuffer.mark();Object message=decode(byteBuffer);if(message==null){byteBufer.reset();break;}messageList.add(message); } if(!bytebuffer.hasRemain()){byteBuffer.clear(); }elsebyteBuffer.compact();if(messageList!=null & !messageList.isEmpty()){for(Obbject messageE:messagList){handlerTask(messageE)} }九。將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發(fā)送給客戶端。
socketChannel.write(buffer).4. 總結
通過源碼分析,我們發(fā)現(xiàn)NIO編程的難度確實比同步阻塞BIO的大很多,我們的NIO程序中還沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼會更加復雜。使用NIO編程的優(yōu)點如下:
- 客戶端發(fā)起連接的操作是異步的,可以通過多路復用器注冊OP_CONNECT等待后續(xù)結果,不需要像之前的客戶端那樣被同步阻塞。
- SocketChannel的讀寫操作是異步的,如果沒有可讀寫的數(shù)據它不會等待,直接返回,這樣I/O通信線程就可以處理其他鏈路,不需要同步等待這個鏈路可用。
- 線程模型的優(yōu)化:由于JDK的Selector在Linux等主流操作系統(tǒng)上通過epoll實現(xiàn),它沒有連接句柄的限制。
總結
以上是生活随笔為你收集整理的NIO详解(四):NIO编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty详解(二)Linux 网络IO
- 下一篇: NIO详解(五):Buffer详解