日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人工智能 > ChatGpt >内容正文

ChatGpt

BIO-NIO-AIO

發(fā)布時間:2025/3/15 ChatGpt 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 BIO-NIO-AIO 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、同步和異步

同步和異步是針對應(yīng)用程序和操作系統(tǒng)的內(nèi)核交互而言的,同步指的是用戶進(jìn)程觸發(fā)IO操作并等待或者輪詢地區(qū)查看IO操作是否就緒,而異步是值用戶觸發(fā)IO操作以后便開始做自己地事情,而當(dāng)IO操作已經(jīng)完成地時候會得到IO完成地通知。

以銀行取款為例:
同步:自己親自持銀行卡到銀行取錢(使用同步IO時,JAVA自己處理IO讀寫)
異步:委托小弟到銀行取錢,我干其他事,然后給我(使用異步IO時,JAVA將IO讀寫委托給OS處理,需要將數(shù)據(jù)緩沖區(qū)地址和大小傳給OS(銀行卡和密碼),需要支持異步IO操作API)

二、阻塞和非阻塞

阻塞和非阻塞是針對于進(jìn)程在訪間數(shù)據(jù)的時候,根據(jù)IO操作的就緒狀態(tài)來采取的不同方式,說白了是一種讀取或者寫入操作方法的實現(xiàn)方式,阻塞方式下讀取或者寫入方法將一直等待,而非阻塞方式下,讀取或者寫入方法會立即返回一個狀態(tài)值。

以銀行取款為例:
阻塞:ATM排隊取款,你只能等待(使用阻塞舊時,JAVA調(diào)用會一直阻塞到讀寫完成才返回)
非阻塞:柜臺取款,取個號,然后坐在椅子上做其它事,等號廣播會通知你辦理,沒到號你就不能去,你可以不斷問大堂經(jīng)理排到了沒有,大堂經(jīng)理如果說還沒到你就不能去(使用非阻塞IO時,如果不能讀寫JAVA調(diào)用會馬上返回,當(dāng)IO事件分發(fā)器通知可讀寫時再繼續(xù)進(jìn)行讀寫,不斷循環(huán)直到讀寫完成)

各IO比較:

三、BIO

網(wǎng)絡(luò)編程的基本模型是C/S模型,即兩個進(jìn)程間的通信。服務(wù)端提供IP和監(jiān)聽端口,客戶端通過連接操作想服務(wù)端監(jiān)聽的地址發(fā)起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進(jìn)行通信。傳統(tǒng)的同步阻塞模型開發(fā)中,ServerSocket負(fù)責(zé)綁定IP地址,啟動監(jiān)聽端口;Socket負(fù)責(zé)發(fā)起連接操作。連接成功后,雙方通過輸入和輸出流進(jìn)行同步阻塞式通信。

傳統(tǒng)的 IO 大致可以分為4種類型:

1.InputStream、OutputStream 基于字節(jié)操作的 IO 2.Writer、Reader 基于字符操作的 IO 3.File 基于磁盤操作的 IO 4.Socket 基于網(wǎng)絡(luò)操作的 IO

本文例子:客戶端發(fā)送一段算式的字符串到服務(wù)器,服務(wù)器計算后返回結(jié)果到客戶端。BIO 就是傳統(tǒng)的 java.io包,它是基于流模型實現(xiàn)的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,線程會一直阻塞在那里,它們之間的調(diào)用時可靠的線性順序。它的有點就是代碼比較簡單、直觀;缺點就是IO 的效率和擴(kuò)展性很低,容易成為應(yīng)用性能瓶頸。該模型最大的問題就是缺乏彈性伸縮能力,當(dāng)客戶端并發(fā)訪問量增加后,服務(wù)端的線程個數(shù)和客戶端并發(fā)訪問數(shù)呈1:1的正比關(guān)系,Java中的線程也是比較寶貴的系統(tǒng)資源,線程數(shù)量快速膨脹后,系統(tǒng)的性能將急劇下降,隨著訪問量的繼續(xù)增大,系統(tǒng)最終就死掉了。
傳統(tǒng)的BIO模型(同步阻塞I/O):

BIO代碼思路:

Socket 網(wǎng)絡(luò)通信過程簡單來說分為下面 4 步:
1.建立服務(wù)端并且監(jiān)聽客戶端請求
2.客戶端請求,服務(wù)端和客戶端建立連接
3.兩端之間可以傳遞數(shù)據(jù)
4.關(guān)閉資源

  • 服務(wù)器端:

1.創(chuàng)建 ServerSocket 對象并且綁定地址(ip)和端口號(port):server.bind(new InetSocketAddress(host, port))
2.通過 accept()方法監(jiān)聽客戶端請求
3.連接建立后,通過輸入流讀取客戶端發(fā)送的請求信息
4.通過輸出流向客戶端發(fā)送響應(yīng)信息
5.關(guān)閉相關(guān)資源

  • 客戶端:

1.創(chuàng)建Socket 對象并且連接指定的服務(wù)器的地址(ip)和端口號(port):socket.connect(inetSocketAddress)
2.連接建立后,通過輸出流向服務(wù)器端發(fā)送請求信息
3.通過輸入流獲取服務(wù)器響應(yīng)的信息
4.關(guān)閉相關(guān)資源

同步阻塞式I/O創(chuàng)建的Server源碼:

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /*** BIO服務(wù)端源碼*/ final class ServerNormal {//默認(rèn)的端口號private static int DEFAULT_PORT = 12345;//單例的ServerSocketprivate static ServerSocket server;//根據(jù)傳入?yún)?shù)設(shè)置監(jiān)聽端口,如果沒有參數(shù)調(diào)用以下方法并使用默認(rèn)值public static void start() throws IOException{//使用默認(rèn)值start(DEFAULT_PORT);}//這個方法不會被大量并發(fā)訪問,不太需要考慮效率,直接進(jìn)行方法同步就可以public synchronized static void start(int port) throws IOException{if(server != null) {return;}try{//通過構(gòu)造函數(shù)創(chuàng)建ServerSocket//如果端口合法且空閑,服務(wù)端就監(jiān)聽成功server = new ServerSocket(port);System.out.println("服務(wù)器已啟動,端口號:" + port);//通過無線循環(huán)監(jiān)聽客戶端連接//如果沒有客戶端接入,將阻塞在accept操作上。while(true){Socket socket = server.accept();//當(dāng)有新的客戶端接入時,會執(zhí)行下面的代碼//然后創(chuàng)建一個新的線程處理這條Socket鏈路new Thread(new ServerHandler(socket)).start();}}finally{//一些必要的清理工作if(server != null){System.out.println("服務(wù)器已關(guān)閉。");server.close();server = null;}}} }

客戶端消息處理線程ServerHandler源碼:

import com.company.Calculator; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /*** 客戶端線程* 用于處理一個客戶端的Socket鏈路*/ public class ServerHandler implements Runnable{private Socket socket;public ServerHandler(Socket socket) {this.socket = socket;}@Overridepublic void run() {BufferedReader in = null;PrintWriter out = null;try{in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(),true);String expression;String result;while(true){//通過BufferedReader讀取一行//如果已經(jīng)讀到輸入流尾部,返回null,退出循環(huán)//如果得到非空值,就嘗試計算結(jié)果并返回if((expression = in.readLine())==null) {break;}System.out.println("服務(wù)器收到消息:" + expression);try{Calculator calculator=new Calculator(expression);result = calculator.cal();}catch(Exception e){result = "計算出錯:" + e.getMessage();}out.println(result);}}catch(Exception e){e.printStackTrace();}finally{//一些必要的清理工作if(in != null){try {in.close();} catch (IOException e) {e.printStackTrace();}in = null;}if(out != null){out.close();out = null;}if(socket != null){try {socket.close();} catch (IOException e) {e.printStackTrace();}socket = null;}}} }

同步阻塞式I/O創(chuàng)建的Client源碼:

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /*** 阻塞式I/O創(chuàng)建的客戶端*/ public class Client {//默認(rèn)的端口號private static int DEFAULT_SERVER_PORT = 12345;private static String DEFAULT_SERVER_IP = "127.0.0.1";public static void send(String expression){send(DEFAULT_SERVER_PORT,expression);}public static void send(int port,String expression){System.out.println("算術(shù)表達(dá)式為:" + expression);Socket socket = null;BufferedReader in = null;PrintWriter out = null;try{socket = new Socket(DEFAULT_SERVER_IP,port);in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(),true);out.println(expression);System.out.println("___結(jié)果為:" + in.readLine());}catch(Exception e){e.printStackTrace();}finally{//一下必要的清理工作if(in != null){try {in.close();} catch (IOException e) {e.printStackTrace();}in = null;}if(out != null){out.close();out = null;}if(socket != null){try {socket.close();} catch (IOException e) {e.printStackTrace();}socket = null;}}} }

放到同一個程序(jvm)中運(yùn)行:

package com.company.BIO; import java.io.IOException; import java.util.Random; /*** 測試方法* @version 1.0*/ public class Test {//測試主方法public static void main(String[] args) throws InterruptedException {//運(yùn)行服務(wù)器new Thread(new Runnable() {@Overridepublic void run() {try {ServerNormal.start();} catch (IOException e) {e.printStackTrace();}}}).start();//避免客戶端先于服務(wù)器啟動前執(zhí)行代碼Thread.sleep(100);//運(yùn)行客戶端char operators[] = {'+','-','*','/'};Random random = new Random(System.currentTimeMillis());new Thread(new Runnable() {@SuppressWarnings("static-access")@Overridepublic void run() {while(true){//隨機(jī)產(chǎn)生算術(shù)表達(dá)式String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);Client.send(expression);try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}).start();} }

偽異步I/O編程
為了改進(jìn)這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實現(xiàn)1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。
偽異步I/O模型圖:

只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /*** BIO服務(wù)端源碼__偽異步I/O*/final class ServerNormal {//默認(rèn)的端口號private static int DEFAULT_PORT = 12345;//單例的ServerSocketprivate static ServerSocket server;//線程池懶漢式的單例private static ExecutorService executorService = Executors.newFixedThreadPool(60);//根據(jù)傳入?yún)?shù)設(shè)置監(jiān)聽端口,如果沒有參數(shù)調(diào)用以下方法并使用默認(rèn)值public static void start() throws IOException{//使用默認(rèn)值start(DEFAULT_PORT);}//這個方法不會被大量并發(fā)訪問,不太需要考慮效率,直接進(jìn)行方法同步就行了public synchronized static void start(int port) throws IOException{if(server != null) {return;}try{//通過構(gòu)造函數(shù)創(chuàng)建ServerSocket//如果端口合法且空閑,服務(wù)端就監(jiān)聽成功server = new ServerSocket(port);System.out.println("服務(wù)器已啟動,端口號:" + port);//通過無線循環(huán)監(jiān)聽客戶端連接//如果沒有客戶端接入,將阻塞在accept操作上。while(true){Socket socket = server.accept();//當(dāng)有新的客戶端接入時,會執(zhí)行下面的代碼//然后創(chuàng)建一個新的線程處理這條Socket鏈路executorService.execute(new ServerHandler(socket));}}finally{//一些必要的清理工作if(server != null){System.out.println("服務(wù)器已關(guān)閉。");server.close();server = null;}}} }

測試運(yùn)行結(jié)果與上面是一樣的。
我們知道,如果使用CachedThreadPool線程池(不限制線程數(shù)量),其實除了能自動幫我們管理線程(復(fù)用),看起來也就像是1:1的客戶端:線程數(shù)模型,而FixedThreadPool我們就有效的控制了線程的最大數(shù)量,保證了系統(tǒng)有限的資源的控制,實現(xiàn)了N:M的偽異步I/O模型。但是,正因為限制了線程數(shù)量,如果發(fā)生大量并發(fā)請求,超過最大數(shù)量的線程就只能等待,直到線程池中的有空閑的線程可以被復(fù)用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發(fā)生:

1.有數(shù)據(jù)可讀 2.可用數(shù)據(jù)以及讀取完畢 3.發(fā)生空指針或I/O異常 4.所以在讀取數(shù)據(jù)較慢時(比如數(shù)據(jù)量大、網(wǎng)絡(luò)傳輸慢等),大量并發(fā)的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

后面介紹的NIO,就能解決這個難題。

四、NIO

NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel(管道)、Selector(多路復(fù)用器)、Buffer(緩沖區(qū))等新的抽象,可以構(gòu)建多路復(fù)用的、同步非阻塞 IO 程序,同時提供了更接近操作系統(tǒng)底層高性能的數(shù)據(jù)操作方式。NIO的最重要的地方是當(dāng)一個連接創(chuàng)建后,不需要對應(yīng)一個線程,這個連接會注冊到多路復(fù)用器上面,所以所有的連接只需要一個線程就可以搞定,當(dāng)這個線程中的多路復(fù)用器進(jìn)行輪詢的時候,發(fā)現(xiàn)連接上有請求的話,才開啟一個線程進(jìn)行處理,也就是一個請求一個線程模式。在NIO的處理方式中,當(dāng)一個請求來的話,開啟線程進(jìn)行處理,可能會等待后端應(yīng)用的資源(JDBC鏈接等),其實這個線程就被阻塞了,當(dāng)并發(fā)上來的話,還是會有BIO一樣的問題。NIO方式適用于鏈接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器。

工作原理圖:

主要參數(shù):
1.Channel:通道,連接客戶端和服務(wù)端的一個管道,通道不同于流的地方就是通道是雙向的,可以用于讀、寫和同時讀寫操作。底層的操作系統(tǒng)的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統(tǒng)的API。

Channel主要分兩大類:

1. SelectableChannel:用戶網(wǎng)絡(luò)讀寫 2. FileChannel:用于文件操作

Channel主要實現(xiàn)類:

1. SocketChannel:一個客戶端發(fā)起TCP連接的Channel 2. ServerSocketChannel:一個服務(wù)端監(jiān)聽新連接的TCP Channel,對于每一個新的Client連接,都會建立一個對應(yīng)的SocketChannel 3. FileChannel:從文件中讀寫數(shù)據(jù)

2.Selector:Selector是NIO中最為重要的組件之一,我們常常說的多路復(fù)用器就是指的Selector組件。Selector組件用于輪詢一個或多個NIO Channel的狀態(tài)是否處于可讀、可寫。通過輪詢的機(jī)制就可以管理多個Channel,也就是說可以管理多個網(wǎng)絡(luò)連接。可以想象成一個環(huán)狀傳送帶,上面可以接入很多管道,selector還可以對每個管道設(shè)置感興趣的顏色(連接(紅色),讀(黃色),寫(藍(lán)色),接收數(shù)據(jù))。當(dāng)Selector開始輪詢的時候Selector這個傳送帶就一直轉(zhuǎn)動,當(dāng)某個管道被傳送到感興趣事件檢查點的時候,selector會檢查改管道當(dāng)前顏色(即事件)之前是否被注冊成了感興趣顏色(事件),如果感興趣,那么Selector就可以對這個管道做處理了,比如把管道傳給別的線程,讓別的線程完成讀寫操作。 一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替?zhèn)鹘y(tǒng)的select實現(xiàn),所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬的客戶端。

  • 輪詢機(jī)制

首先,需要將Channel注冊到Selector上,這樣Selector才知道需要管理哪些Channel,接著Selector會不斷輪詢其上注冊的Channel,如果某個Channel發(fā)生了讀或?qū)懙臅r間,這個Channel就會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒的Channel集合,進(jìn)行后續(xù)的IO操作。

  • 屬性操作
  • 創(chuàng)建Selector
  • 通過open()方法,我們可以創(chuàng)建一個Selector對象。

    Selector selector = Selector.open();
  • 注冊Channel到Selector中
  • 我們需要將Channel注冊到Selector中,才能夠被Selector管理。

    channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

    某個Channel要注冊到Selector中,那么該Channel必須是非阻塞,所有上面代碼中有個configureBlocking()的配置操作。在register(Selector selector, int interestSet)方法的第二個參數(shù),標(biāo)識一個interest集合,意思是Selector對哪些事件感興趣,可以監(jiān)聽四種不同類型的事件:

    Selector可以監(jiān)聽的事件類型(可使用 SelectionKey 的四個常量表示):

  • 讀: SelectionKey.OP_READ (1)
  • 寫:SelectionKey.OP_WRITE(4)
  • 連接 : SelectionKey.OP_CONNECT(8)
  • 接收: SelectionKey.OP_ACCEPT (16)
  • Connect事件 :連接完成事件( TCP 連接 ),僅適用于客戶端,對應(yīng) SelectionKey.OP_CONNECT。
  • Accept事件 :接受新連接事件,僅適用于服務(wù)端,對應(yīng) SelectionKey.OP_ACCEPT 。
  • Read事件 :讀事件,適用于兩端,對應(yīng) SelectionKey.OP_READ ,表示 Buffer 可讀。
  • Write事件:寫時間,適用于兩端,對應(yīng) SelectionKey.OP_WRITE ,表示 Buffer 可寫。
  • Channel觸發(fā)了一個事件,表明該時間已經(jīng)準(zhǔn)備就緒:

  • 一個Client Channel成功連接到另一個服務(wù)器,成為“連接就緒”
  • 一個ServerSocket準(zhǔn)備好接收新進(jìn)入的接,稱為“接收就緒”
  • 一個有數(shù)據(jù)可讀的Channel,稱為“讀就緒”
  • 一個等待寫數(shù)據(jù)的Channel,稱為”寫就緒“
  • 當(dāng)然,Selector是可以同時對多個事件感興趣的,我們使用或運(yùn)算即可組合多個事件:

    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

    Selector其他一些操作
    選擇Channel

    //當(dāng)Selector執(zhí)行select()方法就會產(chǎn)生阻塞, //等到注冊在其上的Channel準(zhǔn)備就緒就會立即返回,返回準(zhǔn)備就緒的數(shù)量。 public abstract int select() throws IOException; //select(long timeout)則是在select()的基礎(chǔ)上增加了超時機(jī)制。 public abstract int select(long timeout) throws IOException; //selectNow()立即返回,不產(chǎn)生阻塞。 public abstract int selectNow() throws IOException;

    有一點非常需要注意: select 方法返回的 int 值,表示有多少 Channel 已經(jīng)就緒。
    自上次調(diào)用select 方法后有多少 Channel 變成就緒狀態(tài)。如果調(diào)用 select 方法,因為有一個 Channel 變成就緒狀態(tài)則返回了 1 ;若再次調(diào)用 select 方法,如果另一個 Channel 就緒了,它會再次返回1。

    獲取可操作的Channel

    Set selectedKeys = selector.selectedKeys();

    當(dāng)有新增就緒的Channel,調(diào)用select()方法,就會將key添加到Set集合中。

    3.ByteBuffer:字節(jié)緩沖區(qū),本質(zhì)上是一個連續(xù)的字節(jié)數(shù)組,Selector感興趣的事件發(fā)生后對管道的讀操作所讀到的數(shù)據(jù)都存儲在ByteBuffer中,而對管道的寫操作也是以ByteBuffer為源頭,值得注意的是ByteBuffer可以有多個。想想看,我們使用的所有基本類型都可以轉(zhuǎn)換成byte字節(jié),比如Integer類型占4字節(jié),那么傳遞數(shù)字 1 ByteBuffer底層數(shù)組被占用了4個格子。

  • 屬性
    Buffer中有4個非常重要的屬性:
  • capacity、limit、position、mark

  • capacity屬性:容量,Buffer能夠容納的數(shù)據(jù)元素的最大值,在Buffer初始化創(chuàng)建的時候被賦值,而且不能被修改。

  • 上圖中,初始化Buffer的容量為8(圖中從0~7,共8個元素),所以capacity = 8

  • limit屬性:代表Buffer可讀可寫的上限。
    寫模式下:limit 代表能寫入數(shù)據(jù)的上限位置,這個時候limit = capacity
    讀模式下:在Buffer完成所有數(shù)據(jù)寫入后,通過調(diào)用flip()方法,切換到讀模式,此時limit等于Buffer中實際已經(jīng)寫入的數(shù)據(jù)大小。因為Buffer可能沒有被寫滿,所以limit<=capacity

  • position屬性:代表讀取或者寫入Buffer的位置。默認(rèn)為0。
    寫模式下:每往Buffer中寫入一個值,position就會自動加1,代表下一次寫入的位置。
    讀模式下:每往Buffer中讀取一個值,position就自動加1,代表下一次讀取的位置。


  • 從上圖就能很清晰看出,讀寫模式下capacity、limit、position的關(guān)系了。

  • mark屬性:
    代表標(biāo)記,通過mark()方法,記錄當(dāng)前position值,將position值賦值給mark,在后續(xù)的寫入或讀取過程中,可以通過reset()方法恢復(fù)當(dāng)前position為mark記錄的值。
  • 這幾個重要屬性講完,我們可以再來回顧下:

    0 <= mark <= position <= limit <= capacity

    現(xiàn)在應(yīng)該很清晰這幾個屬性的關(guān)系了~

  • Buffer常見操作
    • 創(chuàng)建Buffer
    allocate(int capacity) ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer);

    例子中創(chuàng)建的ByteBuffer是基于堆內(nèi)存的一個對象。

    wrap(array)

    wrap方法可以將數(shù)組包裝成一個Buffer對象:

    ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes()); channel.write(buffer); allocateDirect(int capacity)

    通過allocateDirect方法也可以快速實例化一個Buffer對象,和allocate很相似,這里區(qū)別的是allocateDirect創(chuàng)建的是基于堆外內(nèi)存的對象。堆外內(nèi)存不在JVM堆上,不受GC的管理。堆外內(nèi)存進(jìn)行一些底層系統(tǒng)的IO操作時,效率會更高。

    • Buffer寫操作
      Buffer寫入可以通過put()和channel.read(buffer)兩種方式寫入。
      通常我們NIO的讀操作的時候,都是從Channel中讀取數(shù)據(jù)寫入Buffer,這個對應(yīng)的是Buffer的寫操作。
    • Buffer讀操作
      Buffer讀取可以通過get()和channel.write(buffer)兩種方式讀入。
      還是同上,我們對Buffer的讀入操作,反過來說就是對Channel的寫操作。讀取Buffer中的數(shù)據(jù)然后寫入Channel中。
    • 其他常見方法
  • rewind():重置position位置為0,可以重新讀取和寫入buffer,一般該方法適用于讀操作,可以理解為對buffer的重復(fù)讀。
  • flip():很常用的一個方法,一般在寫模式切換到讀模式的時候會經(jīng)常用到。也會將position設(shè)置為0,然后設(shè)置limit等于原來寫入的position。
  • clear():重置buffer中的數(shù)據(jù),該方法主要是針對于寫模式,因為limit設(shè)置為了capacity,讀模式下會出問題。
  • clear():重置buffer中的數(shù)據(jù),該方法主要是針對于寫模式,因為limit設(shè)置為了capacity,讀模式下會出問題。
  • public final Buffer rewind() {position = 0;mark = -1;return this; } public final Buffer flip() {limit = position;position = 0;mark = -1;return this; } public final Buffer clear() {position = 0;limit = capacity;mark = -1;return this; } public final Buffer mark() {mark = position;return this; } public final Buffer reset() {int m = mark;if (m < 0)throw new InvalidMarkException();position = m;return this; }

    常用的讀寫方法可以用一張圖總結(jié)一下:

    NIO提供了與傳統(tǒng)BIO模型中的Socket和ServerSocket相對應(yīng)的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現(xiàn)。新增的著兩種通道都支持阻塞和非阻塞兩種模式。
    示意圖:

    使用NIO步驟:(服務(wù)端)

    • 首先:創(chuàng)建一個傳送帶(selector)
    • 然后:創(chuàng)建一個管道(channel),設(shè)置管道為非阻塞,綁定端口
    • 然后:把管道放到傳送帶上
    • 再然后:啟動傳送帶 其次:傳送帶感興趣事件檢查點查獲一個感興趣管道,轉(zhuǎn)給其他線程對管道進(jìn)行非阻塞讀寫
    • 最后:全使用完,關(guān)閉管道

    NIO服務(wù)端通信序列圖:

    Nio客戶端通信時序圖:

    NIO代碼思路:

  • 將 Channel 注冊到 Selector 中。
  • 調(diào)用 Selector 的 select() 方法,這個方法會阻塞;
  • 到注冊在 Selector 中的某個 Channel 有新的 TCP 連接或者可讀寫事件的話,這個 Channel 就會處于就緒狀態(tài),會被 Selector 輪詢出來。
  • 然后通過 SelectionKey 可以獲取就緒 Channel 的集合,進(jìn)行后續(xù)的 I/O 操作。
  • NIO創(chuàng)建的Server源碼:

    package com.company.NIO; class Server {private static ServerHandle serverHandle;public static void start(){int DEFAULT_PORT = 12345;start(DEFAULT_PORT);}public static synchronized void start(int port){if(serverHandle!=null) {serverHandle.stop();}serverHandle = new ServerHandle(port);new Thread(serverHandle,"Server").start();}public static void main(String[] args){start();} }

    serverHandle:

    package com.company.NIO;import com.company.Calculator;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;/*** NIO服務(wù)端*/ class ServerHandle implements Runnable{private Selector selector;private ServerSocketChannel serverChannel;private volatile boolean started;/*** 構(gòu)造方法* @param port 指定要監(jiān)聽的端口號*/public ServerHandle(int port) {try{//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道serverChannel = ServerSocketChannel.open();//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式serverChannel.configureBlocking(false);//開啟非阻塞模式//綁定端口 backlog設(shè)為1024serverChannel.socket().bind(new InetSocketAddress(port),1024);//監(jiān)聽客戶端連接請求serverChannel.register(selector, SelectionKey.OP_ACCEPT);//標(biāo)記服務(wù)器已開啟started = true;System.out.println("服務(wù)器已啟動,端口號:" + port);}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stop(){started = false;}@Overridepublic void run() {//循環(huán)遍歷selectorwhile(started){try{//無論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次selector.select(1000);//阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù). // selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Throwable t){t.printStackTrace();}}//selector關(guān)閉后會自動釋放里面管理的資源if(selector != null) {try{selector.close();}catch (Exception e) {e.printStackTrace();}}}private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){//處理新接入的請求消息if(key.isAcceptable()){ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//通過ServerSocketChannel的accept創(chuàng)建SocketChannel實例//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立SocketChannel sc = ssc.accept();//設(shè)置為非阻塞的sc.configureBlocking(false);//注冊為讀sc.register(selector, SelectionKey.OP_READ);}//讀消息if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();//創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流,返回讀取到的字節(jié)數(shù)int readBytes = sc.read(buffer);//讀取到字節(jié),對字節(jié)進(jìn)行編解碼if(readBytes>0){//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作buffer.flip();//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組byte[] bytes = new byte[buffer.remaining()];//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中buffer.get(bytes);String expression = new String(bytes,"UTF-8");System.out.println("服務(wù)器收到消息:" + expression);//處理數(shù)據(jù)String result = null;try{Calculator calculator=new Calculator(expression);result = calculator.cal().toString();}catch(Exception e){result = "計算錯誤:" + e.getMessage();}//發(fā)送應(yīng)答消息doWrite(sc,result);}//沒有讀取到字節(jié) 忽略 // else if(readBytes==0);//鏈路已經(jīng)關(guān)閉,釋放資源else if(readBytes<0){key.cancel();sc.close();}}}}//異步發(fā)送應(yīng)答消息private void doWrite(SocketChannel channel,String response) throws IOException{//將消息編碼為字節(jié)數(shù)組byte[] bytes = response.getBytes();//根據(jù)數(shù)組容量創(chuàng)建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節(jié)數(shù)組復(fù)制到緩沖區(qū)writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發(fā)送緩沖區(qū)的字節(jié)數(shù)組channel.write(writeBuffer);//****此處不含處理“寫半包”的代碼} }

    client:

    package com.company.NIO;class Client {private static String DEFAULT_HOST = "127.0.0.1";private static int DEFAULT_PORT = 12345;private static ClientHandle clientHandle;public static void start(){start(DEFAULT_HOST,DEFAULT_PORT);}public static synchronized void start(String ip,int port){if(clientHandle!=null) {clientHandle.stop();}clientHandle = new ClientHandle(ip,port);new Thread(clientHandle,"Server").start();}//向服務(wù)器發(fā)送消息public static boolean sendMsg(String msg) throws Exception{if("q".equals(msg)) {return false;}clientHandle.sendMsg(msg);return true;}public static void main(String[] args){start();} }

    clientHandle:

    package com.company.NIO;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /*** NIO客戶端*/ public class ClientHandle implements Runnable{private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean started;public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道socketChannel = SocketChannel.open();//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式socketChannel.configureBlocking(false);//開啟非阻塞模式started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stop(){started = false;}@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循環(huán)遍歷selectorwhile(started){try{//無論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次selector.select(1000);//阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù). // selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector關(guān)閉后會自動釋放里面管理的資源if(selector != null) {try{selector.close();}catch (Exception e) {e.printStackTrace();}}}private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){if(sc.finishConnect()) {;} else {System.exit(1);}}//讀消息if(key.isReadable()){//創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流,返回讀取到的字節(jié)數(shù)int readBytes = sc.read(buffer);//讀取到字節(jié),對字節(jié)進(jìn)行編解碼if(readBytes>0){//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作buffer.flip();//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組byte[] bytes = new byte[buffer.remaining()];//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客戶端收到消息:" + result);}//沒有讀取到字節(jié) 忽略 // else if(readBytes==0);//鏈路已經(jīng)關(guān)閉,釋放資源else if(readBytes<0){key.cancel();sc.close();}}}}//異步發(fā)送消息private void doWrite(SocketChannel channel,String request) throws IOException{//將消息編碼為字節(jié)數(shù)組byte[] bytes = request.getBytes();//根據(jù)數(shù)組容量創(chuàng)建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//將字節(jié)數(shù)組復(fù)制到緩沖區(qū)writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發(fā)送緩沖區(qū)的字節(jié)數(shù)組channel.write(writeBuffer);//****此處不含處理“寫半包”的代碼}private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))) {;} else {socketChannel.register(selector, SelectionKey.OP_CONNECT);}}public void sendMsg(String msg) throws Exception{socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);} }

    NIO Test代碼:

    import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.Iterator; import java.util.Scanner;/*** 一、使用 NIO 完成網(wǎng)絡(luò)通信的三個核心:* * 1. 通道(Channel):負(fù)責(zé)連接* * java.nio.channels.Channel 接口:* |--SelectableChannel* |--SocketChannel* |--ServerSocketChannel* |--DatagramChannel* * |--Pipe.SinkChannel* |--Pipe.SourceChannel* * 2. 緩沖區(qū)(Buffer):負(fù)責(zé)數(shù)據(jù)的存取* * 3. 選擇器(Selector):是 SelectableChannel 的多路復(fù)用器。用于監(jiān)控 SelectableChannel 的 IO 狀況* */ public class TestNonBlockingNIO {//客戶端@Testpublic void client() throws IOException{//1. 獲取通道SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));//2. 切換非阻塞模式sChannel.configureBlocking(false);//3. 分配指定大小的緩沖區(qū)ByteBuffer buf = ByteBuffer.allocate(1024);//4. 發(fā)送數(shù)據(jù)給服務(wù)端Scanner scan = new Scanner(System.in);while(scan.hasNext()){String str = scan.next();buf.put((new Date().toString() + "\n" + str).getBytes());buf.flip();sChannel.write(buf);buf.clear();}//5. 關(guān)閉通道sChannel.close();}//服務(wù)端@Testpublic void server() throws IOException{//1. 獲取通道ServerSocketChannel ssChannel = ServerSocketChannel.open();//2. 切換非阻塞模式ssChannel.configureBlocking(false);//3. 綁定連接ssChannel.bind(new InetSocketAddress(9898));//4. 獲取選擇器Selector selector = Selector.open();//5. 將通道注冊到選擇器上, 并且指定“監(jiān)聽接收事件”ssChannel.register(selector, SelectionKey.OP_ACCEPT);//6. 輪詢式的獲取選擇器上已經(jīng)“準(zhǔn)備就緒”的事件while(selector.select() > 0){//說明至少有一個準(zhǔn)備就緒了//7. 獲取當(dāng)前選擇器中所有注冊的“選擇鍵(已就緒的監(jiān)聽事件)”Iterator<SelectionKey> it = selector.selectedKeys().iterator();while(it.hasNext()){//8. 獲取準(zhǔn)備“就緒”的是事件SelectionKey sk = it.next();//9. 判斷具體是什么事件準(zhǔn)備就緒if(sk.isAcceptable()){//10. 若“接收就緒”,獲取客戶端連接SocketChannel sChannel = ssChannel.accept();//11. 切換非阻塞模式sChannel.configureBlocking(false);//12. 將該通道注冊到選擇器上sChannel.register(selector, SelectionKey.OP_READ);}else if(sk.isReadable()){//13. 獲取當(dāng)前選擇器上“讀就緒”狀態(tài)的通道SocketChannel sChannel = (SocketChannel) sk.channel();/**14. 讀取數(shù)據(jù)*/ByteBuffer buf = ByteBuffer.allocate(1024);int len = 0;while((len = sChannel.read(buf)) > 0 ){buf.flip();System.out.println(new String(buf.array(), 0, len));buf.clear();}}//15. 取消選擇鍵 SelectionKeyit.remove();}}} }

    與傳統(tǒng)IO比較:

    • 傳統(tǒng)IO一般是一個線程等待連接,連接過來之后分配給processor線程,processor線程與通道連接后如果通道沒有數(shù)據(jù)過來就會阻塞(線程被動掛起)不能做別的事情。NIO則不同,首先:在Selector線程輪詢的過程中就已經(jīng)過濾掉了不感興趣的事件,其次:在processor處理感興趣事件的read和write都是非阻塞操作即直接返回的,線程沒有被掛起。
    • 傳統(tǒng)IO的管道是單向的,NIO的管道是雙向的
    • 兩者都是同步的,也就是Java程序親力親為的去讀寫數(shù)據(jù),不管傳統(tǒng)IO還是NIo都需要read和write方法,這些都是Java程序調(diào)用的而不是系統(tǒng)幫我們調(diào)用的。NIO2.0里這點得到了改觀,即使用異步非阻塞AsynchronousXXX四個類來處理。

    五、AIO

    AIO 是 Java 1.7 之后引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操作方式,所以人們叫它AIO(Asynchronous IO),異步 IO是基于事件和回調(diào)機(jī)制實現(xiàn)的,也就是應(yīng)用操作之后會直接返回,不會堵塞在那里,當(dāng)后臺處理完成,操作系統(tǒng)會通知相應(yīng)的線程進(jìn)行后續(xù)的操作。與NIO不同,當(dāng)進(jìn)行讀寫操作時,只須直接調(diào)用API的read或write方法即可·這兩種方法均為異步的,對于讀操作而言,當(dāng)有可讀取時,操作系統(tǒng)會將可讀的流傳入read方法的緩沖區(qū),通知應(yīng)用程序;對于寫操作而言,當(dāng)操作系統(tǒng)將write方法傳遞的寫入完畢時,操作系統(tǒng)主動涌知應(yīng)用程序。即可以理解為,read/write方法都是異步的,完成后會主動調(diào)用回調(diào)函數(shù)。在JDK1.7中,這部分內(nèi)容稱作NIO.2。異步非阻塞,服務(wù)器實現(xiàn)模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了在通知服務(wù)器應(yīng)用去啟動線程進(jìn)行處理,AIO方式適用于鏈接數(shù)目多且連接比較長的架構(gòu),比如相冊服務(wù)器,重復(fù)調(diào)用OS參與并發(fā)操作。 異步的套接字通道時真正的異步非阻塞I/O,對應(yīng)于UNIX網(wǎng)絡(luò)編程中的事件驅(qū)動I/O(AIO)。他不需要過多的Selector對注冊的通道進(jìn)行輪詢即可實現(xiàn)異步讀寫,從而簡化了NIO的編程模型。


    流程圖:

    服務(wù)端代碼:
    AsyncServerHandler:

    package com.company.AIO.server;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncServerHandler implements Runnable {public CountDownLatch latch;public AsynchronousServerSocketChannel channel;public AsyncServerHandler(int port) {try {//創(chuàng)建服務(wù)端通道channel = AsynchronousServerSocketChannel.open();//綁定端口channel.bind(new InetSocketAddress(port));System.out.println("服務(wù)器已啟動,端口號:" + port);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//CountDownLatch初始化//它的作用:在完成一組正在執(zhí)行的操作之前,允許當(dāng)前的現(xiàn)場一直阻塞//此處,讓現(xiàn)場在此阻塞,防止服務(wù)端執(zhí)行完成后退出//也可以使用while(true)+sleep//生成環(huán)境就不需要擔(dān)心這個問題,以為服務(wù)端是不會退出的latch = new CountDownLatch(1);//用于接收客戶端的連接channel.accept(this,new AcceptHandler());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}} }

    AcceptHandler:

    package com.company.AIO.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; //作為handler接收客戶端連接 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {@Overridepublic void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {//繼續(xù)接受其他客戶端的請求Server.clientCount++;System.out.println("連接的客戶端數(shù):" + Server.clientCount);serverHandler.channel.accept(serverHandler, this);//創(chuàng)建新的BufferByteBuffer buffer = ByteBuffer.allocate(1024);//異步讀 第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handlerchannel.read(buffer, buffer, new ReadHandler(channel));}@Overridepublic void failed(Throwable exc, AsyncServerHandler serverHandler) {exc.printStackTrace();serverHandler.latch.countDown();} }

    ReadHandler:

    package com.company.AIO.server; import com.company.Calculator; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {//用于讀取半包消息和發(fā)送應(yīng)答private AsynchronousSocketChannel channel;public ReadHandler(AsynchronousSocketChannel channel) {this.channel = channel;}//讀取到消息后的處理@Overridepublic void completed(Integer result, ByteBuffer attachment) {//flip操作attachment.flip();//根據(jù)byte[] message = new byte[attachment.remaining()];attachment.get(message);try {String expression = new String(message, "UTF-8");System.out.println("服務(wù)器收到消息: " + expression);String calrResult = null;try{Calculator calculator=new Calculator(expression);calrResult = calculator.cal().toString();}catch(Exception e){calrResult = "計算錯誤:" + e.getMessage();}//向客戶端發(fā)送消息doWrite(calrResult);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}//發(fā)送消息private void doWrite(String result) {byte[] bytes = result.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();//異步寫數(shù)據(jù) 參數(shù)與前面的read一樣channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {//如果沒有發(fā)送完,就繼續(xù)發(fā)送直到完成if (buffer.hasRemaining()) {channel.write(buffer, buffer, this);} else{//創(chuàng)建新的BufferByteBuffer readBuffer = ByteBuffer.allocate(1024);//異步讀 第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handlerchannel.read(readBuffer, readBuffer, new ReadHandler(channel));}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {channel.close();} catch (IOException e) {}}});}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {this.channel.close();} catch (IOException e) {e.printStackTrace();}} }

    Server:

    package com.company.AIO.server; /*** AIO服務(wù)端*/ public class Server {private static int DEFAULT_PORT = 12345;private static AsyncServerHandler serverHandle;public volatile static long clientCount = 0;public static void start(){start(DEFAULT_PORT);}public static synchronized void start(int port){if(serverHandle!=null) {return;}serverHandle = new AsyncServerHandler(port);new Thread(serverHandle,"Server").start();}public static void main(String[] args){Server.start();} }

    客戶端代碼:
    AsyncClientHandler:

    package com.company.AIO.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {private AsynchronousSocketChannel clientChannel;private String host;private int port;private CountDownLatch latch;public AsyncClientHandler(String host, int port) {this.host = host;this.port = port;try {//創(chuàng)建異步的客戶端通道clientChannel = AsynchronousSocketChannel.open();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//創(chuàng)建CountDownLatch等待latch = new CountDownLatch(1);//發(fā)起異步連接操作,回調(diào)參數(shù)就是這個類本身,如果連接成功會回調(diào)completed方法clientChannel.connect(new InetSocketAddress(host, port), this, this);try {latch.await();} catch (InterruptedException e1) {e1.printStackTrace();}try {clientChannel.close();} catch (IOException e) {e.printStackTrace();}}//連接服務(wù)器成功//意味著TCP三次握手完成@Overridepublic void completed(Void result, AsyncClientHandler attachment) {System.out.println("客戶端成功連接到服務(wù)器...");}//連接服務(wù)器失敗@Overridepublic void failed(Throwable exc, AsyncClientHandler attachment) {System.err.println("連接服務(wù)器失敗...");exc.printStackTrace();try {clientChannel.close();latch.countDown();} catch (IOException e) {e.printStackTrace();}}//向服務(wù)器發(fā)送消息public void sendMsg(String msg){byte[] req = msg.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);writeBuffer.put(req);writeBuffer.flip();//異步寫clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));} }

    ReadHandler:

    package com.company.AIO.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel clientChannel;private CountDownLatch latch;public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {this.clientChannel = clientChannel;this.latch = latch;}@Overridepublic void completed(Integer result,ByteBuffer buffer) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String body;try {body = new String(bytes,"UTF-8");System.out.println("客戶端收到結(jié)果:"+ body);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc,ByteBuffer attachment) {System.err.println("數(shù)據(jù)讀取失敗...");try {clientChannel.close();latch.countDown();} catch (IOException e) {}} }

    WriteHandler:

    package com.company.AIO.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel clientChannel;private CountDownLatch latch;public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {this.clientChannel = clientChannel;this.latch = latch;}@Overridepublic void completed(Integer result, ByteBuffer buffer) {//完成全部數(shù)據(jù)的寫入if (buffer.hasRemaining()) {clientChannel.write(buffer, buffer, this);}else {//讀取數(shù)據(jù)ByteBuffer readBuffer = ByteBuffer.allocate(1024);clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {System.err.println("數(shù)據(jù)發(fā)送失敗...");try {clientChannel.close();latch.countDown();} catch (IOException e) {}} }

    Client:

    package com.company.AIO.client; import java.util.Scanner; public class Client {private static String DEFAULT_HOST = "127.0.0.1";private static int DEFAULT_PORT = 12345;private static AsyncClientHandler clientHandle;public static void start(){start(DEFAULT_HOST,DEFAULT_PORT);}public static synchronized void start(String ip,int port){if(clientHandle!=null) {return;}clientHandle = new AsyncClientHandler(ip,port);new Thread(clientHandle,"Client").start();}//向服務(wù)器發(fā)送消息public static boolean sendMsg(String msg) throws Exception{if("q".equals(msg)) {return false;}clientHandle.sendMsg(msg);return true;}@SuppressWarnings("resource")public static void main(String[] args) throws Exception{Client.start();System.out.println("請輸入請求消息:");Scanner scanner = new Scanner(System.in);while(Client.sendMsg(scanner.nextLine()));} }

    六、附一張總結(jié)圖

    參考文章1
    參考文章2
    參考文章3
    參考文章4

    總結(jié)

    以上是生活随笔為你收集整理的BIO-NIO-AIO的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。