[转]Java AIO学习
java AIO學習
轉載? http://blog.csdn.net/zhongweijian/article/details/8005444
系統I/O 可分為阻塞型, 非阻塞同步型以及非阻塞異步型[1,?2]. 阻塞型I/O意味著控制權只到調用操作結束了才會回到調用者手里. 結果調用者被阻塞了, 這段時間了做不了任何其它事情. 更郁悶的是,在等待IO結果的時間里,調用者所在線程此時無法騰出手來去響應其它的請求,這真是太浪費資源了。拿read()操作來說吧, 調用此函數的代碼會一直僵在此處直至它所讀的socket緩存中有數據到來.
相比之下,非阻塞同步是會立即返回控制權給調用者的。調用者不需要等等,它從調用的函數獲取兩種結果:要么此次調用成功進行了;要么系統返回錯誤標識告訴調用者當前資源不可用,你再等等或者再試度看吧。比如read()操作, 如果當前socket無數據可讀,則立即返回EWOULBLOCK/EAGAIN,告訴調用read()者"數據還沒準備好,你稍后再試".
在非阻塞異步調用中,稍有不同。調用函數在立即返回時,還告訴調用者,這次請求已經開始了。系統會使用另外的資源或者線程來完成這次調用操作,并在完成的時候知會調用者(比如通過回調函數)。拿Windows的ReadFile()或者POSIX的aio_read()來說,調用它之后,函數立即返回,操作系統在后臺同時開始讀操作。
在以上三種IO形式中,非阻塞異步是性能最高、伸縮性最好的。
這篇文章探討不同的I/O利用機制并提供一種跨平臺的設計模式(解決方案). 希望此文可以給于TCP高性能服務器開發者一些幫助,選擇最佳的設計方案。下面我們會比較 Java, c#, C++各自對探討方案的實現以及性能. 我們在文章的后面就不再提及阻塞式的方案了,因為阻塞式I/O實在是缺少可伸縮性,性能也達不到高性能服務器的要求。
兩種IO多路復用方案: Reactor andProactor
一般情況下,I/O 復用機制需要事件分享器(event demultiplexor [1,?3]). 事件分享器的作用,即將那些讀寫事件源分發給各讀寫事件的處理者,就像送快遞的在樓下喊: 誰的什么東西送了, 快來拿吧。開發人員在開始的時候需要在分享器那里注冊感興趣的事件,并提供相應的處理者(event handlers),或者是回調函數; 事件分享器在適當的時候會將請求的事件分發給這些handler或者回調函數.
涉及到事件分享器的兩種模式稱為:Reactor and Proactor [1]. Reactor模式是基于同步I/O的,而Proactor模式是和異步I/O相關的. 在Reactor模式中,事件分離者等待某個事件或者可應用或個操作的狀態發生(比如文件描述符可讀寫,或者是socket可讀寫),事件分離者就把這個事件傳給事先注冊的事件處理函數或者回調函數,由后者來做實際的讀寫操作。
而在Proactor模式中,事件處理者(或者代由事件分離者發起)直接發起一個異步讀寫操作(相當于請求),而實際的工作是由操作系統來完成的。發起時,需要提供的參數包括用于存放讀到數據的緩存區,讀的數據大小,或者用于存放外發數據的緩存區,以及這個請求完后的回調函數等信息。事件分離者得知了這個請求,它默默等待這個請求的完成,然后轉發完成事件給相應的事件處理者或者回調。舉例來說,在Windows上事件處理者投遞了一個異步IO操作(稱有overlapped的技術),事件分離者等IOCompletion事件完成[1]. 這種異步模式的典型實現是基于操作系統底層異步API的,所以我們可稱之為“系統級別”的或者“真正意義上”的異步,因為具體的讀寫是由操作系統代勞的。
舉另外個例子來更好地理解Reactor與Proactor兩種模式的區別。這里我們只關注read操作,因為write操作也是差不多的。下面是Reactor的做法:
- 某個事件處理者宣稱它對某個socket上的讀事件很感興趣;
- 事件分離者等著這個事件的發生;
- 當事件發生了,事件分離器被喚醒,這負責通知先前那個事件處理者;
- 事件處理者收到消息,于是去那個socket上讀數據了. 如果需要,它再次宣稱對這個socket上的讀事件感興趣,一直重復上面的步驟;
下面再來看看真正意義的異步模式Proactor是如何做的:
- 事件處理者直接投遞發一個寫操作(當然,操作系統必須支持這個異步操作). 這個時候,事件處理者根本不關心讀事件,它只管發這么個請求,它魂牽夢縈的是這個寫操作的完成事件。這個處理者很拽,發個命令就不管具體的事情了,只等著別人(系統)幫他搞定的時候給他回個話。
- 事件分離者等著這個讀事件的完成(比較下與Reactor的不同);
- 當事件分離者默默等待完成事情到來的同時,操作系統已經在一邊開始干活了,它從目標讀取數據,放入用戶提供的緩存區中,最后通知事件分離者,這個事情我搞完了;
- 事件分享者通知之前的事件處理者: 你吩咐的事情搞定了;
- 事件處理者這時會發現想要讀的數據已經乖乖地放在他提供的緩存區中,想怎么處理都行了。如果有需要,事件處理者還像之前一樣發起另外一個寫操作,和上面的幾個步驟一樣。
jdk 7中已經內置了AIO的實現,可以參考http://www.iteye.com/topic/472333?這個對aio的分析
我們試試jdk7的AIO的示例
服務器端程序 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class EchoAioServer { private final int port; public static void main(String args[]) { int port = 8000; new EchoAioServer(port); } public EchoAioServer(int port) { this.port = port; listen(); } private void listen() { try { ExecutorService executorService = Executors.newCachedThreadPool(); AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); try (AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup)) { server.bind(new InetSocketAddress(port)); System.out.println("Echo listen on " + port); server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { final ByteBuffer echoBuffer = ByteBuffer.allocateDirect(1024); public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println("waiting ...."); try { echoBuffer.clear(); result.read(echoBuffer).get(); echoBuffer.flip(); // echo data result.write(echoBuffer); echoBuffer.flip(); // System.out.println("Echoed '" + new String(echoBuffer.array()) + "' to " + result); } catch (InterruptedException | ExecutionException e) { System.out.println(e.toString()); } finally { try { result.close(); server.accept(null, this); } catch (Exception e) { System.out.println(e.toString()); } } System.out.println("done..."); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("server failed: " + exc); } }); try { // Wait for ever Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException ex) { System.out.println(ex); } } } catch (IOException e) { System.out.println(e); } } }
客戶端程序
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; public class EchoAioClient { private final AsynchronousSocketChannel client ; public EchoAioClient() throws Exception{ client = AsynchronousSocketChannel.open(); } public void start()throws Exception{ client.connect(new InetSocketAddress("127.0.0.1",8000),null,new CompletionHandler<Void,Void>() { @Override public void completed(Void result, Void attachment) { try { client.write(ByteBuffer.wrap("this is a test".getBytes())).get(); System.out.println("send data to server"); } catch (Exception ex) { ex.printStackTrace(); } } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); } }); final ByteBuffer bb = ByteBuffer.allocate(1024); client.read(bb, null, new CompletionHandler<Integer,Object>(){ @Override public void completed(Integer result, Object attachment) { System.out.println(result); System.out.println(new String(bb.array())); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } } ); try { // Wait for ever Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException ex) { System.out.println(ex); } } public static void main(String args[])throws Exception{ new EchoAioClient().start(); } }
執行server跟client后輸出如下:
server輸出:
Echo listen on 8000 waiting .... done...
?
client輸出:
send data to server 14 this is a test
總結
以上是生活随笔為你收集整理的[转]Java AIO学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 联想新款小新Pro笔记本2月发布:支持6
- 下一篇: servlet版本及容器和Java版本