【Java NIO】一文了解NIO
【Java NIO】一文了解NIO
Java NIO
1 背景介紹
在上一篇文章中我們介紹了Java基本IO,也就是阻塞式IO(BIO),在JDK1.4版本后推出了新的IO系統(tǒng)(NIO),也可以理解為非阻塞IO(Non-Blocking IO)。引用《Java NIO》中的一段話來解釋一下NIO出現(xiàn)的原因:
操作系統(tǒng)與 Java 基于流的 I/O模型有些不匹配。操作系統(tǒng)要移動的是大塊數(shù)據(jù)(緩沖區(qū)),這往往是在硬件直接存儲器存取( DMA)的協(xié)助下完成的。而 JVM 的 I/O 類喜歡操作小塊數(shù)據(jù)——單個字節(jié)、幾行文本。結(jié)果,操作系統(tǒng)送來整緩沖區(qū)的數(shù)據(jù), java.io 的流數(shù)據(jù)類再花大量時間把它們拆成小塊,往往拷貝一個小塊就要往返于幾層對象。操作系統(tǒng)喜歡整卡車地運來數(shù)據(jù), java.io 類則喜歡一鏟子一鏟子地加工數(shù)據(jù)。有了 NIO,就可以輕松地把一卡車數(shù)據(jù)備份到您能直接使用的地方( ByteBuffer 對象)。但是Java里的RandomAccessFile類是比較接近操作系統(tǒng)的方式。
可以看出Java原生的IO模型之所以慢,是因為與操作系統(tǒng)的操作方式不匹配造成的,那么NIO之所以比BIO快主要就是用到了緩沖區(qū)相關(guān)的技術(shù),接下來慢慢介紹這些技術(shù)點。
1.1 緩沖區(qū)操作
下圖描述了操作系統(tǒng)中數(shù)據(jù)是如何從外部存儲向運行中的進(jìn)程內(nèi)存區(qū)域移動的過程:進(jìn)程使用read()系統(tǒng)調(diào)用要求緩沖區(qū)被填充滿。內(nèi)核隨即向磁盤控制器發(fā)出指令,要求其從磁盤讀取數(shù)據(jù)。磁盤控制器通過DMA直接把磁盤上的數(shù)據(jù)寫入緩沖區(qū),這一步不需要CPU的參與。當(dāng)緩沖區(qū)填滿時,內(nèi)核將數(shù)據(jù)從臨時緩沖區(qū)拷貝到進(jìn)程執(zhí)行read()調(diào)用時指定的緩沖區(qū)。
這里需要主要為什么要執(zhí)行系統(tǒng)調(diào)用這樣一個中間步驟而不是直接DMA到進(jìn)程的緩沖區(qū),是因為用戶空間是無法直接操作硬件的,另外磁盤這種塊存儲設(shè)備操作的是固定大小的數(shù)據(jù)塊,而用戶請求的則是非規(guī)則大小的數(shù)據(jù),內(nèi)核空間在這里的作用就是分解、重組的作用。
2 基本組件
Java NIO主要依賴的組件有三個:緩沖區(qū)Buffer、通道Channel和選擇器Selector。
2.1 緩沖區(qū)(Buffer)
Buffer家族主要有這么些個成員,根據(jù)類名也大概能猜到它們的用處,用的最多的是ByteBuffer,在下面的例子中也會主要用到它。
在這里就不仔細(xì)講Buffer類的API了,因為需要用的時候可以去查Java Doc,而以幾個常用的操作來講述一下怎么使用Buffer。
2.1.1 緩沖區(qū)屬性
容量(capacity):緩沖區(qū)的最大大小
上界(limit):緩沖區(qū)當(dāng)前的大小
位置(position):下一個要讀寫的位置,由get()和put()更新
標(biāo)記(mark):備忘位置,由mark()來指定mark = position,由reset()來指定position=mark
它們之間的大小關(guān)系:
0 <= mark <= position <= limit <= capacity
2.1.2 創(chuàng)建緩沖區(qū)
一種最常用的方式是:
ByteBuffer buffer = ByteBuffer.allocate(1024);這種方法是創(chuàng)建一個1024字節(jié)大小的緩沖區(qū)。也可以用下面這種方式來包裝自己創(chuàng)建的字節(jié)數(shù)組。
byte[] bytes = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
2.1.3 緩沖區(qū)翻轉(zhuǎn)
Buffer在填充完畢后需要傳遞到一個通道中,這時如果直接讀取Buffer,其實是什么都讀不到的。因為Buffer的設(shè)計中是有一個指針概念的,指向當(dāng)前的位置,當(dāng)一個Buffer填充完畢時指針是指向末尾的,因此在讀取時應(yīng)該將指針指向Buffer的頭部,簡單的方法就是使用下面這個方法:
Buffer.flip();flip的實現(xiàn)如下:
public final Buffer flip() {limit = position;position = 0; mark = -1; return this; }可以看出flip其實是把當(dāng)前的limit從capacity變成了position,又把position放到了緩沖區(qū)的起點,并取消了mark。
2.1.4 緩沖區(qū)清空
Buffer.clear();clear的實現(xiàn)如下:
public final Buffer clear() {position = 0;limit = capacity;mark = -1; return this; }clear函數(shù)就是將position放到起點,并重置limiti為capacity,以及取消mark。
2.1.5 另外一種翻轉(zhuǎn)
Buffer.rewind();rewind的實現(xiàn)如下:
public final Buffer rewind() { position = 0; mark = -1; return this; }rewind和flip的區(qū)別在于沒有改變limit的值。
2.1.6 緩沖區(qū)壓縮
Buffer.compact()2.2 通道(Channel)
開始我不是很理解Channel這個東西為什么要存在,看了書才慢慢明白,緩沖區(qū)為我們裝載了數(shù)據(jù),但是數(shù)據(jù)的寫入和讀取并不能直接進(jìn)行read()和write()這樣的系統(tǒng)調(diào)用,而是JVM為我們提供了一層對系統(tǒng)調(diào)用的封裝。而Channel可以用最小的開銷來訪問操作系統(tǒng)本身的IO服務(wù),這就是為什么要有Channel的原因。
下面來講講常用的幾個Channel類及其常用的方法。
2.2.1 常見Channel分類
I/O從廣義上可以分為File I/O和Stream I/O,對應(yīng)到通道來說就有文件通道和socket通道,具體的說是FileChannle類和SocketChannel、ServerSocketChannel和DatagramChannel類。
它們之間的區(qū)別還是很大的,從繼承關(guān)系上來看:
public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannelFileChannel主要是繼承了可中斷接口,而對于socket相關(guān)的Channel類都繼承AbstractSelectableChannel,這是選擇器(Selector)相關(guān)的通道,在下一節(jié)中具體講解。
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel2.2.2 文件通道
2.2.2.1 打開
FileChannel只能通過工廠方法來實例化,那就是調(diào)用RandomAccessFile、FileInputStream和FileOutputStream的getChannel()方法。如:
RandomAccessFile file = new RandomAccessFile("a.txt", "r"); FileChannel fc = file.getChannel();2.2.2.2 使用
先看看FileChannel提供的方法句柄:
public abstract int read(ByteBuffer dst) throws IOException;//把通道中數(shù)據(jù)傳到目的緩沖區(qū)中,dst是destination的縮寫 public abstract int write(ByteBuffer src) throws IOException;//把源緩沖區(qū)中的內(nèi)容寫到指定的通道中去從句柄可以看出FileChannel是既可以讀又可以寫的,是全雙工的。下面這個例子用來展示FileChannel是如何進(jìn)行讀和寫的。
public class FileChannelTest {public static void readFile(String path) throws IOException { FileChannel fc = new FileInputStream(path).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(128); StringBuilder sb = new StringBuilder(); while ((fc.read(buffer)) >= 0) { //翻轉(zhuǎn)指針 buffer.flip(); //remaining = limit - position byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String string = new String(bytes, "UTF-8"); sb.append(string); //清空buffer buffer.clear(); } System.out.println(sb.toString()); } public static void writeFile(String path, String string) throws IOException { FileChannel fc = new FileOutputStream(path).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(10); int current = 0; int len = string.getBytes().length; while (current < len) { for (int i=0;i<10;i++) { if (current+i>=len) break; buffer.put(string.getBytes()[current+i]); } current += buffer.position(); buffer.flip(); fc.write(buffer); buffer.clear(); } } public static void main(String[] args) throws IOException { String in = "D:/in.txt"; String out = "D:/out.txt"; readFile(in); writeFile(out, "hello world"); readFile(out); } }分析一下上面這段代碼,在readFile()函數(shù)中,通過FileInputStream.getChannel()得到FileChannel對象,并創(chuàng)建ByteBuffer對象,接著利用FileChannel的read方法填充buffer,得到填充完的buffer之后我們將buffer的當(dāng)前指針翻轉(zhuǎn)一下接著利用buffer的get方法把數(shù)據(jù)放到byte數(shù)組中,接著就可以讀取數(shù)據(jù)了。
讀取文件的整個過程相比原生的I/O方法還是略顯麻煩,但是我們?nèi)绻褦?shù)據(jù)看成一堆煤礦,把ByteBuffer看成裝煤的礦車,而FileChannel看成是運煤的礦道,那么上面的過程就演變成了:先打通一條礦道,然后把煤礦裝在小車?yán)镞\出來。形象的記憶更利于理解這個過程。
而writeFile()函數(shù)也是類似,為了更好的理解Buffer的屬性,我特意將buffer的大小設(shè)置為10,為要寫入的字符串長度為11個字節(jié)。首先還是通過FileOutputStream.getChannel()方法得到FileChannel對象,并創(chuàng)建一個10字節(jié)大小的緩沖區(qū),接著定義一個整型變量current指向要寫入的字符串的當(dāng)前下標(biāo),每次向buffer中put10個字節(jié),并更新current,通過buffer.position()方法可以得到buffer被填充之后指針的位置,也就是緩沖區(qū)里的字節(jié)個數(shù),然后翻轉(zhuǎn)指針,最后通過FileChannel.write(buffer)方法將buffer寫入到文件中。
同樣考慮一下形象化的過程:我們首先把煤礦裝入小車(buffer.put()),并打開一條通往礦山的礦道(FileOutputStream.getChannel()),接著把煤礦運輸進(jìn)去(FileChannel.write(buffer))。還是很容易理解的吧!
2.2.3 Socket通道
在另一篇博客中介紹了阻塞式TCP的使用,接下來會介紹一下非阻塞式的TCP使用。
Socket通道與文件通道有著不同的特征,最顯著的就是可以運行非阻塞模式并且是可以選擇的。在2.2.1節(jié)中我們講到Socket通道都繼承自AbstractSelectableChannel類,而文件通道沒有,而這個類就是Socket通道擁有非阻塞和可選擇特點的關(guān)鍵。下面是SelectableChannel的幾個方法句柄:
public abstract boolean isBlocking(); public abstract SelectableChannel configureBlocking(boolean block) throws IOException;從這兩個方法句柄可以看到,設(shè)置一個socket通道的非阻塞模式只需要:
socketChannel.configureBlocking(false)即可。而有條件的選擇(readiness selection)是一種可以用來查詢通道的機(jī)制,該查詢可以判斷通道是否準(zhǔn)備好執(zhí)行一個目標(biāo)操作,比如read、write或accept。這個特性是在SelectableChannel類和SelectionKey類中進(jìn)行了定義。
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;SelectionKey中的四個常量定義了socket通道的四種狀態(tài),而SelectableChannel的register方法正好返回了SelectionKey對象。
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;2.2.3.1 創(chuàng)建
socket通道與文件通道不同,并不是通過socket.getChannel()來創(chuàng)建對象(盡管socket對象有這個方法),而是通過SocketChannel.open()這樣的靜態(tài)工廠方法去創(chuàng)建對象。每一個socket通道有與之關(guān)聯(lián)的一個socket對象,卻并不是所有的socket對象都有一個關(guān)聯(lián)的通道,如果用傳統(tǒng)的方法創(chuàng)建了一個socket對象,則它不會有一個關(guān)聯(lián)的通道并且getChannel()方法總是返回null。
SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false);這樣就創(chuàng)建了一個非阻塞的socket通道。
2.2.3.2 ServerSocketChannel
public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open( ) throws IOException; public abstract ServerSocket socket( ); public abstract ServerSocket accept( ) throws IOException; public final int validOps( ); }ServerSocketChannel與SocketChannel和DatagramChannel不同,它本身是不傳輸數(shù)據(jù)的,提供的接口非常簡單,如果要進(jìn)行數(shù)據(jù)讀寫,需要通過ServerSocketChannel.socket()方法返回一個與之關(guān)聯(lián)的ServerSocket對象來進(jìn)行。
ServerSocketChannel ssc = ServerSocketChannel.open(); ServerSocket ss = ssc.socket(); ss.bind(new InetSocketAddress(port));ServerSocketChannel同ServerSocket一樣也有accept()方法,當(dāng)調(diào)用ServerSocket的accept()函數(shù)時只能是阻塞式的,而調(diào)用ServerSocketChannel的accept()函數(shù)卻可以是非阻塞式。
下面這個例子展示了ServerSocketChannel的用法:
public class Server {static int port = 20001; public static void main(String[] args) throws IOException, InterruptedException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); String string = "hello client"; ByteBuffer buffer = ByteBuffer.wrap(string.getBytes()); ByteBuffer in = ByteBuffer.allocate(1024); System.out.println("Server wait for connection..."); while (true) { SocketChannel sc = ssc.accept(); if (sc == null) { TimeUnit.SECONDS.sleep(1); }else { //rewind只是將position調(diào)到0,不會改變Limit的值,而flip是將limit調(diào)整成position,再把position改成0 System.out.println("Server get a connection..."); sc.read(in); in.flip(); buffer.rewind(); sc.write(buffer); System.out.println("From client:" + new String(in.array(), "UTF-8")); } } } }2.3 選擇器(Selector)
選擇器其實是一種多路復(fù)用機(jī)制在Java語言中的應(yīng)用,在學(xué)習(xí)Selector之前有必要學(xué)習(xí)一下I/O多路復(fù)用的概念。
2.3.1 多路復(fù)用
在之前的文章中我們已經(jīng)看到對于每個客戶端請求都分配一個線程的設(shè)計,或者是利用線程池來處理客戶端請求,但是這樣的設(shè)計對于處理客戶端有大量請求的情況都束手無策。原因在于首先線程非常消耗系統(tǒng)資源,其次阻塞式的設(shè)計在某一個請求發(fā)送的數(shù)據(jù)很大時會使其他請求等待很久。那么究竟有沒有其他方法來解決這個問題呢?早在上世紀(jì)80年代在Unix系統(tǒng)中就已經(jīng)提出select模型來解決這個問題,在之后對select進(jìn)行優(yōu)化又提出了poll模型和epoll模型(Linux專有)。
select/poll/epoll其實都是一種多路復(fù)用模型,什么是多路復(fù)用,開始聽見這個名詞我也是一臉懵逼,覺得好像很高大上很難理解的樣子。后面通過看書和看知乎上的形象化描述,慢慢理解了其實多路復(fù)用也沒有想象著那么難。我們?nèi)绻衙總€客戶端請求看成一個電路,如下圖,那么是否有必要為每條電路都分配一條專有的線路呢?還是當(dāng)電流來了進(jìn)行開關(guān)切換?很明顯,后者只需要一個開關(guān)就可以節(jié)省大量的不必要開銷。select模型其實就是這樣做的,監(jiān)控所有的socket請求,當(dāng)某個socket準(zhǔn)備好(read/write/accept)時就進(jìn)行處。但是如何做到監(jiān)控所有socket的狀態(tài)呢,select做的很簡單,也許你也想到了,就是去輪詢所有socket的狀態(tài),這樣很明顯當(dāng)socket數(shù)量比較大時效率非常低。并且select對于監(jiān)控的socket數(shù)量有限制,默認(rèn)是1024個。poll模型進(jìn)行了一些改進(jìn),但是并沒有本質(zhì)的改變。到了epoll模型,就有了非常大的改觀。假象另一個場景,如果你是一個監(jiān)考老師,考試結(jié)束時要去收卷子,你按照常理一個一個的收著,一旦有一個學(xué)生還沒寫完,于是你就會卡(阻塞)在那,并且整個輪詢一遍下來非常慢。所以你想到了嗎?讓那些已經(jīng)做完的學(xué)生舉手告知你他已經(jīng)做完了,你再過去收一下卷子即可。這樣很明顯阻塞會大幅度減少。這就是epoll,讓那些已經(jīng)準(zhǔn)備好的socket發(fā)出通知,然后來處理它。
如果還是不理解,可以看看知乎上的一些回答。
2.3.2 NIO多路復(fù)用
好了,廢話這么多,已經(jīng)是可以理解多路復(fù)用是什么了。Java語言直到JDK1.4版本才有多路復(fù)用這個概念,很大原因也是因為沒人用Java去寫服務(wù)器,例如著名的Apache和Nginx都是用C/C++寫的。接下來對NIO中多路復(fù)用的實現(xiàn)進(jìn)行介紹。
NIO處理多路復(fù)用請求只需要三個組件:可選擇的通道(SelectableChannels)、選擇器(Selector)和選擇鍵(SelectionKey),他們之間的關(guān)系如下圖所示:
可選擇的通道可以主動注冊到一個選擇器上,并指定對哪些動作是感興趣的。這個注冊行為會返回一個選擇鍵,選擇鍵封裝了該通道和選擇器之間的注冊關(guān)系,包含兩個比特集:指示該注冊關(guān)系所關(guān)心的通道操作;通道已經(jīng)準(zhǔn)備好的操作。選擇器是核心組件,它管理著注冊在其上的通道集合的信息和它們的就緒狀態(tài)。值得注意的是,通道在注冊到一個選擇器之前,必須設(shè)置為非阻塞模式。原因在這里。
2.3.3 常用操作
2.3.3.1 創(chuàng)建選擇器
通過靜態(tài)工廠方法創(chuàng)建一個選擇器。
Selector selector = Selector.open();2.3.3.2 通道注冊到選擇器上
這是通道擁有的方法,先看看方法句柄:
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); }值得注意的是第二個參數(shù)ops,這個參數(shù)表示了該通道感興趣的操作,所有的操作包括讀(read)、寫(write)、連接(connect)和接受(accept)。并不是所有通道都支持這些操作,比如SocketChannel就沒有accept這個操作,因為這是專屬于ServerSocketChannel的操作。可以通過調(diào)用Channel.validOps()來查詢支持的操作。
第三個參數(shù)是用來傳遞一個對象的引用,在調(diào)用新生成的選擇鍵的attach()方法時會返回該對象的引用。
2.3.3.3 選擇過程
選擇器的核心功能是選擇過程,選擇器實際上是對select()、poll()等本地系統(tǒng)調(diào)用的一個封裝。每一個選擇器會維護(hù)三個鍵集合:已注冊的鍵集合、已選擇的鍵集合和已取消的鍵集合。通過執(zhí)行Selector.select()、Selector.select(int timeout)或Selector.selectNow(),選擇過程被調(diào)用,這時會執(zhí)行以下步驟:
- 首先會檢查被取消的鍵的集合。因為在任何時候選擇鍵(通道和選擇器的綁定關(guān)系)都可能被取消,所以在正式選擇之前需要先檢查一下被取消的鍵。如果這個集合非空,則其中的鍵會從另外兩個鍵集合中去除。
- 已注冊的鍵集合中的鍵的interest集合將被檢查。這個過程會調(diào)用底層的系統(tǒng)調(diào)用(具體調(diào)用依賴于特定的操作系統(tǒng)),如果沒有通道準(zhǔn)備好,則線程會阻塞在這里。這個操作會更新那些準(zhǔn)備好interest集合中至少一種操作的通道的ready集合。這一句非常的拗口,也比較難理解,說的簡單一點,就是每個通道有一個感興趣操作集合,底層的系統(tǒng)調(diào)用可以去檢查這些操作是否就緒,如果就緒就會更新該通道綁定的選擇鍵里的相關(guān)值。所以你只需要去檢查選擇鍵里的相關(guān)值就可以知道該操作是不是準(zhǔn)備好了。
- 完成步驟2可能很耗時。完成后還需要再進(jìn)行步驟1,因為這個過程中某些選擇鍵也可能被取消,這樣做是為了提高程序的健壯性(robust)。
- 最后select()操作會返回此次選擇過程中ready()集合被改變的鍵的數(shù)量,而不是所有的ready()集合中的鍵的數(shù)量。這非常合理,因為你可以知道這次選擇過程到底有幾個通道準(zhǔn)備就緒。通過判斷select()返回值是否大于0,就可以知道要不要去操作了。
好不容易才寫完上面這段,因為我在看原書的時候看了2-3遍才看懂,過程還是比較復(fù)雜的,我覺得是時候去看看Unix中的select()是怎么做的,也許這樣更利于理解這個選擇過程。
2.3.3.4 綜合使用這三個組件
說了這么多原理,不知道你暈沒暈,反正我是快暈了。這時候來一段實戰(zhàn)代碼,告訴你了解了這么多,到底該怎么用!
通常的做法如下:在選擇器上調(diào)用一次select操作(這會更新已選擇鍵的集合),然后遍歷selectedKeys返回的鍵的集合。接著鍵將從已選擇的鍵的集合中被移除(通過Iterator.remove()方法),然后檢測下一個鍵。完成后,繼續(xù)下一次select操作。
服務(wù)端程序演示:
public class SelectorTest {public static void main(String[] args) throws IOException { new SelectorTest().select(); } public void select() throws IOException { //創(chuàng)建選擇器 Selector selector = Selector.open(); //創(chuàng)建serverChannel ServerSocketChannel ssc = ServerSocketChannel.open(); //設(shè)置為非阻塞模式 ssc.configureBlocking(false); //綁定監(jiān)聽的地址 ssc.socket().bind(new InetSocketAddress(20000), 1024); //將serverChannel注冊到選擇器上,監(jiān)聽accept事件,返回選擇鍵 ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { //此次選擇過程準(zhǔn)備就緒的通道數(shù)量 int num = selector.select(); if (num == 0) { //若沒有準(zhǔn)備好的就繼續(xù)循環(huán) continue; } //返回已就緒的鍵集合 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); handle(selector, key); //因為已經(jīng)處理了該鍵,所以把當(dāng)前的key從已選擇的集合中去除 it.remove(); } } } public void handle(Selector selector, SelectionKey key) throws IOException { if (key.isValid()) { //當(dāng)一個ServerChannel為accept狀態(tài)時,注冊這個ServerChannel的SocketChannel為可讀取狀態(tài) if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); //把通道注冊到選擇器之前要設(shè)置為非阻塞,否則會報異常 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } //如果channel是可讀取狀態(tài),則讀取其中的數(shù)據(jù) if (key.isReadable()) { //只有SocketChannel才能讀寫數(shù)據(jù),所以如果是可讀取狀態(tài),只能是SocketChannel SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer in = ByteBuffer.allocate(1024); //將socketChannel中的數(shù)據(jù)讀入到buffer中,返回當(dāng)前字節(jié)的位置 int readBytes = sc.read(in); if (readBytes > 0) { //把buffer的position指針指向buffer的開頭 in.flip(); byte[] bytes = new byte[in.remaining()]; in.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The server receive : " + body); //把response輸出到socket中 doWrite(sc, "Hello client"); } else if (readBytes < 0) { key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel sc, String response) throws IOException { //把服務(wù)器端返回的數(shù)據(jù)寫到socketChannel中 if (response == null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); } } }代碼相較于阻塞式TCP服務(wù)端程序復(fù)雜了太多倍,但是基本思路跟我上面那段話寫的是一樣的,而且基本每一段代碼都寫了注釋,耐心看下去肯定看的懂。我就不再解釋這段代碼啦。
客戶端演示:
public class Client {public static final int PORT = 20000; public static final String HOST = "127.0.0.1"; private volatile boolean stop = false; public static void main(String[] args) throws IOException { new Client().select(); } public void select() throws IOException { // 創(chuàng)建選擇器 Selector selector = Selector.open(); // 創(chuàng)建SocketChannel SocketChannel sc = SocketChannel.open(); // 設(shè)置為非阻塞模式 sc.configureBlocking(false); try { doConnect(selector, sc); } catch (Exception e) { e.printStackTrace(); System.exit(1); } while (!stop) { int num = selector.select(); if (num == 0) { continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); try { handleKeys(selector, key); } catch (Exception e) { e.printStackTrace(); if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } // 因為已經(jīng)處理了該鍵,所以把當(dāng)前的key從已選擇的集合中去除 it.remove(); } } if (selector != null) { selector.close(); } } private void doWrite(SocketChannel sc, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send msg successfully"); } } } private void handleKeys(Selector selector, SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); // 判斷是否連接成功 if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc, "Hello Server"); } else { System.exit(1); } } if (key.isReadable()) { ByteBuffer in = ByteBuffer.allocate(1024); // 將socketChannel中的數(shù)據(jù)讀入到buffer中,返回當(dāng)前字節(jié)的位置 int readBytes = sc.read(in); if (readBytes > 0) { // 把buffer的position指針指向buffer的開頭 in.flip(); byte[] bytes = new byte[in.remaining()]; in.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The Client receive : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關(guān)閉 key.cancel(); sc.close(); } else { // 讀到0字節(jié),忽略 } } } } private void doConnect(Selector selector, SocketChannel sc) throws IOException { if (sc.connect(new InetSocketAddress(HOST, PORT))) { System.out.println("Client connect successfully..."); // 如果直接連接成功,則注冊讀操作 sc.register(selector, SelectionKey.OP_READ); doWrite(sc, "Hello server!"); } else { // 如果沒有連接成功,則注冊連接操作 sc.register(selector, SelectionKey.OP_CONNECT); } } }客戶端跟服務(wù)端很相似,唯一不同的是服務(wù)端需要監(jiān)測的socket行為是OP_ACCEPT和OP_READ,而客戶端需要監(jiān)控的是OP_CONNECT和OP_READ,其他的區(qū)別不是很大。
依次運行服務(wù)器端和客戶端,結(jié)果如下:
代碼在我的github repo上也可以找到。
3 總結(jié)
花了大概三天的時間,把《Java NIO》這本書看了一遍并記錄了下來學(xué)習(xí)過程,并且結(jié)合《Netty權(quán)威指南》中的例子去實踐了一下,慢慢感覺到NIO的魅力。反思一下學(xué)習(xí)的比較慢的原因,應(yīng)該是對Unix上的I/O模型不熟悉導(dǎo)致的,所以覺得接下來好好學(xué)習(xí)一下select、poll、epoll,加深對多路復(fù)用的理解。
本文中可能存在理解有偏差的地方,也請多多指正。
參考文獻(xiàn)
1.《Java NIO》
2.《Netty權(quán)威指南》
轉(zhuǎn)載于:https://www.cnblogs.com/handsome1013/p/7542143.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的【Java NIO】一文了解NIO的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CSS3:FlexBox的详解
- 下一篇: Java 获取Web项目相对webapp