日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Java AIO初探(异步网络IO)

發布時間:2023/12/15 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java AIO初探(异步网络IO) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Java AIO初探(異步網絡IO)

? 原文: http://www.blogjava.net/killme2008/archive/2009/09/20/295743.html

按照《Unix網絡編程》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO復用、信號驅動IO和異步IO,按照POSIX標準來劃分只分為兩類:同步IO和異步IO。如何區分呢?首先一個IO操作其實分成了兩個步驟:發起IO請求和實際的IO操作,同步IO和異步IO的區別就在于第二個步驟是否阻塞,如果實際的IO讀寫阻塞請求進程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信號驅動IO都是同步IO,如果不阻塞,而是操作系統幫你做完IO操作再將結果返回給你,那么就是異步IO。阻塞IO和非阻塞IO的區別在于第一步,發起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統的阻塞IO,如果不阻塞,那么就是非阻塞IO。

?? Java nio 2.0的主要改進就是引入了異步IO(包括文件和網絡),這里主要介紹下異步網絡IO API的使用以及框架的設計,以TCP服務端為例。首先看下為了支持AIO引入的新的類和接口:

?java.nio.channels.AsynchronousChannel
?????? 標記一個channel支持異步IO操作。

?java.nio.channels.AsynchronousServerSocketChannel
?????? ServerSocket的aio版本,創建TCP服務端,綁定地址,監聽端口等。

?java.nio.channels.AsynchronousSocketChannel
?????? 面向流的異步socket channel,表示一個連接。

?java.nio.channels.AsynchronousChannelGroup
?????? 異步channel的分組管理,目的是為了資源共享。一個AsynchronousChannelGroup綁定一個線程池,這個線程池執行兩個任務:處理IO事件和派發CompletionHandler。AsynchronousServerSocketChannel創建的時候可以傳入一個AsynchronousChannelGroup,那么通過AsynchronousServerSocketChannel創建的AsynchronousSocketChannel將同屬于一個組,共享資源

?java.nio.channels.CompletionHandler
?????? 異步IO操作結果的回調接口,用于定義在IO操作完成后所作的回調工作。AIO的API允許兩種方式來處理異步操作的結果:返回的Future模式或者注冊CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的調用是由AsynchronousChannelGroup的線程池派發的。顯然,線程池的大小是性能的關鍵因素。AsynchronousChannelGroup允許綁定不同的線程池,通過三個靜態方法來創建:

?public?static?AsynchronousChannelGroup?withFixedThreadPool(int?nThreads,
???????????????????????????????????????????????????????????????ThreadFactory?threadFactory)
????????
throws?IOException

?
public?static?AsynchronousChannelGroup?withCachedThreadPool(ExecutorService?executor,
????????????????????????????????????????????????????????????????
int?initialSize)

?
public?static?AsynchronousChannelGroup?withThreadPool(ExecutorService?executor)
????????
throws?IOException
????需要根據具體應用相應調整,從框架角度出發,需要暴露這樣的配置選項給用戶。

???? 在介紹完了aio引入的TCP的主要接口和類之后,我們來設想下一個aio框架應該怎么設計。參考非阻塞nio框架的設計,一般都是采用Reactor模式,Reacot負責事件的注冊、select、事件的派發;相應地,異步IO有個Proactor模式,Proactor負責CompletionHandler的派發,查看一個典型的IO寫操作的流程來看兩者的區別:

???? Reactor:? send(msg) -> 消息隊列是否為空,如果為空? -> 向Reactor注冊OP_WRITE,然后返回 -> Reactor select -> 觸發Writable,通知用戶線程去處理 ->先注銷Writable(很多人遇到的cpu 100%的問題就在于沒有注銷),處理Writeable,如果沒有完全寫入,繼續注冊OP_WRITE。注意到,寫入的工作還是用戶線程在處理。
???? Proactor: send(msg) -> 消息隊列是否為空,如果為空,發起read異步調用,并注冊CompletionHandler,然后返回。 -> 操作系統負責將你的消息寫入,并返回結果(寫入的字節數)給Proactor -> Proactor派發CompletionHandler。可見,寫入的工作是操作系統在處理,無需用戶線程參與。事實上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。

??? CompletionHandler有三個方法,分別對應于處理成功、失敗、被取消(通過返回的Future)情況下的回調處理:

public?interface?CompletionHandler<V,A>?{

?????
void?completed(V?result,?A?attachment);

????
void?failed(Throwable?exc,?A?attachment);

???
????
void?cancelled(A?attachment);
}

???其中的泛型參數V表示IO調用的結果,而A是發起調用時傳入的attchment。

??? 在初步介紹完aio引入的類和接口后,我們看看一個典型的tcp服務端是怎么啟動的,怎么接受連接并處理讀和寫,這里引用的代碼都是yanf4j 的aio分支中的代碼,可以從svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

??? 第一步,創建一個AsynchronousServerSocketChannel,創建之前先創建一個AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以綁定一個AsynchronousChannelGroup,那么通過這個AsynchronousServerSocketChannel建立的連接都將同屬于一個AsynchronousChannelGroup并共享資源:
this.asynchronousChannelGroup?=?AsynchronousChannelGroup
????????????????????.withCachedThreadPool(Executors.newCachedThreadPool(),
????????????????????????????
this.threadPoolSize);
??? 然后初始化一個AsynchronousServerSocketChannel,通過open方法:
this.serverSocketChannel?=?AsynchronousServerSocketChannel
????????????????.open(
this.asynchronousChannelGroup);
???通過nio 2.0引入的SocketOption類設置一些TCP選項:
this.serverSocketChannel
????????????????????.setOption(
????????????????????????????StandardSocketOption.SO_REUSEADDR,
true);
this.serverSocketChannel
????????????????????.setOption(
????????????????????????????StandardSocketOption.SO_RCVBUF,
16*1024);
???綁定本地地址:

this.serverSocketChannel
????????????????????.bind(
new?InetSocketAddress("localhost",8080),?100); ???
??? 其中的100用于指定等待連接的隊列大小(backlog)。完了嗎?還沒有,最重要的 監聽工作還沒開始,監聽端口是為了等待連接上來以便accept產生一個AsynchronousSocketChannel來表示一個新建立的連接,因此需要發起一個accept調用,調用是異步的,操作系統將在連接建立后,將最后的結果——AsynchronousSocketChannel返回給你:

public?void?pendingAccept()?{
????????
if?(this.started?&&?this.serverSocketChannel.isOpen())?{
????????????
this.acceptFuture?=?this.serverSocketChannel.accept(null,
????????????????????
new?AcceptCompletionHandler());

????????}?
else?{
????????????
throw?new?IllegalStateException("Controller?has?been?closed");
????????}
????}
?? 注意,重復的accept調用將會拋出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一個參數是你想傳給CompletionHandler的attchment,第二個參數就是注冊的用于回調的CompletionHandler,最后返回結果Future<AsynchronousSocketChannel>。你可以對future做處理,這里采用更推薦的方式就是注冊一個CompletionHandler。那么accept的CompletionHandler中做些什么工作呢?顯然一個赤裸裸的AsynchronousSocketChannel是不夠的,我們需要將它封裝成session,一個session表示一個連接(mina里就叫IoSession了),里面帶了一個緩沖的消息隊列以及一些其他資源等。在連接建立后,除非你的服務器只準備接受一個連接,不然你需要在后面 繼續調用pendingAccept來發起另一個accept請求
private?final?class?AcceptCompletionHandler?implements
????????????CompletionHandler
<AsynchronousSocketChannel,?Object>?{

????????@Override
????????
public?void?cancelled(Object?attachment)?{
????????????logger.warn(
"Accept?operation?was?canceled");
????????}

????????@Override
????????
public?void?completed(AsynchronousSocketChannel?socketChannel,
????????????????Object?attachment)?{
????????????
try?{
????????????????logger.debug(
"Accept?connection?from?"
????????????????????????
+?socketChannel.getRemoteAddress());
????????????????configureChannel(socketChannel);
????????????????AioSessionConfig?sessionConfig?
=?buildSessionConfig(socketChannel);
????????????????Session?session?
=?new?AioTCPSession(sessionConfig,
????????????????????????AioTCPController.
this.configuration
????????????????????????????????.getSessionReadBufferSize(),
????????????????????????AioTCPController.
this.sessionTimeout);
????????????????session.start();
????????????????registerSession(session);
????????????}?
catch?(Exception?e)?{
????????????????e.printStackTrace();
????????????????logger.error(
"Accept?error",?e);
????????????????notifyException(e);
????????????}?
finally?{
????????????????pendingAccept();
????????????}
????????}

????????@Override
????????
public?void?failed(Throwable?exc,?Object?attachment)?{
????????????logger.error(
"Accept?error",?exc);
????????????
try?{
????????????????notifyException(exc);
????????????}?
finally?{
????????????????pendingAccept();
????????????}
????????}
????}
???
??? 注意到了吧,我們在failed和 completed方法中在最后都調用了pendingAccept來繼續發起accept調用,等待新的連接上來。有的同學可能要說了,這樣搞是不是遞歸調用,會不會堆棧溢出?實際上不會,因為發起accept調用的線程與CompletionHandler回調的線程并非同一個,不是一個上下文中,兩者之間沒有耦合關系。要注意到,CompletionHandler的回調共用的是AsynchronousChannelGroup綁定的線程池,因此 千萬別在回調方法中調用阻塞或者長時間的操作,例如sleep,回調方法最好能支持超時,防止線程池耗盡。

??? 連接建立后,怎么讀和寫呢?回憶下在nonblocking nio框架中,連接建立后的第一件事是干什么?注冊OP_READ事件等待socket可讀。異步IO也同樣如此,連接建立后馬上發起一個異步read調用,等待socket可讀,這個是Session.start方法中所做的事情:

public?class?AioTCPSession?{
????
protected?void?start0()?{
????????pendingRead();
????}

????
protected?final?void?pendingRead()?{
????????
if?(!isClosed()?&&?this.asynchronousSocketChannel.isOpen())?{
????????????
if?(!this.readBuffer.hasRemaining())?{
????????????????
this.readBuffer?=?ByteBufferUtils
????????????????????????.increaseBufferCapatity(
this.readBuffer);
????????????}
????????????
this.readFuture?=?this.asynchronousSocketChannel.read(
????????????????????
this.readBuffer,?this,?this.readCompletionHandler);
????????}?
else?{
????????????
throw?new?IllegalStateException(
????????????????????
"Session?Or?Channel?has?been?closed");
????????}
????}
???
}

???? AsynchronousSocketChannel的read調用與AsynchronousServerSocketChannel的accept調用類似,同樣是非阻塞的,返回結果也是一個Future,但是寫的結果是整數,表示寫入了多少字節,因此read調用返回的是Future<Integer>,方法的第一個參數是讀的緩沖區,操作系統將IO讀到數據拷貝到這個緩沖區,第二個參數是傳遞給CompletionHandler的attchment,第三個參數就是注冊的用于回調的CompletionHandler。這里保存了read的結果Future,這是為了在關閉連接的時候能夠主動取消調用,accept也是如此。現在可以看看read的CompletionHandler的實現:
public?final?class?ReadCompletionHandler?implements
????????CompletionHandler
<Integer,?AbstractAioSession>?{

????
private?static?final?Logger?log?=?LoggerFactory
????????????.getLogger(ReadCompletionHandler.
class);
????
protected?final?AioTCPController?controller;

????
public?ReadCompletionHandler(AioTCPController?controller)?{
????????
this.controller?=?controller;
????}

????@Override
????
public?void?cancelled(AbstractAioSession?session)?{
????????log.warn(
"Session("?+?session.getRemoteSocketAddress()
????????????????
+?")?read?operation?was?canceled");
????}

????@Override
????
public?void?completed(Integer?result,?AbstractAioSession?session)?{
????????
if?(log.isDebugEnabled())
????????????log.debug(
"Session("?+?session.getRemoteSocketAddress()
????????????????????
+?")?read?+"?+?result?+?"?bytes");
????????
if?(result?<?0)?{
????????????session.close();
????????????
return;
????????}
????????
try?{
????????????
if?(result?>?0)?{
????????????????session.updateTimeStamp();
????????????????session.getReadBuffer().flip();
????????????????session.decode();
????????????????session.getReadBuffer().compact();
????????????}
????????}?
finally?{
????????????
try?{
????????????????session.pendingRead();
????????????}?
catch?(IOException?e)?{
????????????????session.onException(e);
????????????????session.close();
????????????}
????????}
????????controller.checkSessionTimeout();
????}

????@Override
????
public?void?failed(Throwable?exc,?AbstractAioSession?session)?{
????????log.error(
"Session?read?error",?exc);
????????session.onException(exc);
????????session.close();
????}

}

?? 如果IO讀失敗,會返回失敗產生的異常,這種情況下我們就主動關閉連接,通過session.close()方法,這個方法干了兩件事情:關閉channel和取消read調用:
if?(null?!=?this.readFuture)?{
????????????
this.readFuture.cancel(true);
????????}
this.asynchronousSocketChannel.close(); ?? 在讀成功的情況下,我們還需要判斷結果result是否小于0, 如果小于0就表示對端關閉了,這種情況下我們也主動關閉連接并返回。如果讀到一定字節,也就是result大于0的情況下,我們就嘗試從讀緩沖區中decode出消息,并派發給業務處理器的回調方法,最終 通過pendingRead繼續發起read調用等待socket的下一次可讀。可見,我們并不需要自己去調用channel來進行IO讀,而是操作系統幫你直接讀到了緩沖區,然后給你一個結果表示讀入了多少字節,你處理這個結果即可。而nonblocking IO框架中,是reactor通知用戶線程socket可讀了,然后用戶線程自己去調用read進行實際讀操作。 這里還有個需要注意的地方,就是decode出來的消息的派發給業務處理器工作最好交給一個線程池來處理,避免阻塞group綁定的線程池。
??
?? IO寫的操作與此類似,不過通常寫的話我們會在session中關聯一個緩沖隊列來處理,沒有完全寫入或者等待寫入的消息都存放在隊列中,隊列為空的情況下發起write調用:


????
protected?void?write0(WriteMessage?message)?{
????????
boolean?needWrite?=?false;
????????
synchronized?(this.writeQueue)?{
????????????needWrite?
=?this.writeQueue.isEmpty();
????????????
this.writeQueue.offer(message);
????????}
????????
if?(needWrite)?{
????????????pendingWrite(message);
????????}
????}

????
protected?final?void?pendingWrite(WriteMessage?message)?{
????????message?
=?preprocessWriteMessage(message);
????????
if?(!isClosed()?&&?this.asynchronousSocketChannel.isOpen())?{
????????????
this.asynchronousSocketChannel.write(message.getWriteBuffer(),
????????????????????
this,?this.writeCompletionHandler);
????????}?
else?{
????????????
throw?new?IllegalStateException(
????????????????????
"Session?Or?Channel?has?been?closed");
????????}
????}


??? write調用返回的結果與read一樣是一個Future<Integer>,而write的CompletionHandler處理的核心邏輯大概是這樣:
@Override
????
public?void?completed(Integer?result,?AbstractAioSession?session)?{
????????
if?(log.isDebugEnabled())
????????????log.debug(
"Session("?+?session.getRemoteSocketAddress()
????????????????????
+?")?writen?"?+?result?+?"?bytes");
????????????????
????????WriteMessage?writeMessage;
????????Queue
<WriteMessage>?writeQueue?=?session.getWriteQueue();
????????
synchronized?(writeQueue)?{
????????????writeMessage?
=?writeQueue.peek();
????????????
if?(writeMessage.getWriteBuffer()?==?null
????????????????????
||?!writeMessage.getWriteBuffer().hasRemaining())?{
????????????????writeQueue.remove();
????????????????
if?(writeMessage.getWriteFuture()?!=?null)?{
????????????????????writeMessage.getWriteFuture().setResult(Boolean.TRUE);
????????????????}
????????????????
try?{
????????????????????session.getHandler().onMessageSent(session,
????????????????????????????writeMessage.getMessage());
????????????????}?
catch?(Exception?e)?{
????????????????????session.onException(e);
????????????????}
????????????????writeMessage?
=?writeQueue.peek();
????????????}
????????}
????????
if?(writeMessage?!=?null)?{
????????????
try?{
????????????????session.pendingWrite(writeMessage);
????????????}?
catch?(IOException?e)?{
????????????????session.onException(e);
????????????????session.close();
????????????}
????????}
??? }

?? compete方法中的result就是實際寫入的字節數,然后我們判斷消息的緩沖區是否還有剩余,如果沒有就將消息從隊列中移除,如果隊列中還有消息,那么繼續發起write調用。

?? 重復一下,這里引用的代碼都是yanf4j aio分支中的源碼,感興趣的朋友可以直接check out出來看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
?? 在引入了aio之后,java對于網絡層的支持已經非常完善,該有的都有了,java也已經成為服務器開發的首選語言之一。java的弱項在于對內存的管理上,由于這一切都交給了GC,因此在高性能的網絡服務器上還是Cpp的天下。java這種單一堆模型比之erlang的進程內堆模型還是有差距,很難做到高效的垃圾回收和細粒度的內存管理。

?? 這里僅僅是介紹了aio開發的核心流程,對于一個網絡框架來說,還需要考慮超時的處理、緩沖buffer的處理、業務層和網絡層的切分、可擴展性、性能的可調性以及一定的通用性要求。

總結

以上是生活随笔為你收集整理的Java AIO初探(异步网络IO)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。