Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
本文會從傳統(tǒng)的BIO到NIO再到AIO自淺至深介紹,并附上完整的代碼講解。
????下面代碼中會使用這樣一個例子:客戶端發(fā)送一段算式的字符串到服務(wù)器,服務(wù)器計算后返回結(jié)果到客戶端。
????代碼的所有說明,都直接作為注釋,嵌入到代碼中,看代碼時就能更容易理解,代碼中會用到一個計算結(jié)果的工具類,見文章代碼部分。
????相關(guān)的基礎(chǔ)知識文章推薦:
????Linux 網(wǎng)絡(luò) I/O 模型簡介(圖文)
????Java 并發(fā)(多線程)????
1、BIO編程
????1.1、傳統(tǒng)的BIO編程
????網(wǎng)絡(luò)編程的基本模型是C/S模型,即兩個進(jìn)程間的通信。
????服務(wù)端提供IP和監(jiān)聽端口,客戶端通過連接操作想服務(wù)端監(jiān)聽的地址發(fā)起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進(jìn)行通信。
????傳統(tǒng)的同步阻塞模型開發(fā)中,ServerSocket負(fù)責(zé)綁定IP地址,啟動監(jiān)聽端口;Socket負(fù)責(zé)發(fā)起連接操作。連接成功后,雙方通過輸入和輸出流進(jìn)行同步阻塞式通信。?
????簡單的描述一下BIO的服務(wù)端通信模型:采用BIO通信模型的服務(wù)端,通常由一個獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端創(chuàng)建一個新的線程進(jìn)行鏈路處理沒處理完成后,通過輸出流返回應(yīng)答給客戶端,線程銷毀。即典型的一請求一應(yīng)答通宵模型。
????傳統(tǒng)BIO通信模型圖:
????
????該模型最大的問題就是缺乏彈性伸縮能力,當(dāng)客戶端并發(fā)訪問量增加后,服務(wù)端的線程個數(shù)和客戶端并發(fā)訪問數(shù)呈1:1的正比關(guān)系,Java中的線程也是比較寶貴的系統(tǒng)資源,線程數(shù)量快速膨脹后,系統(tǒng)的性能將急劇下降,隨著訪問量的繼續(xù)增大,系統(tǒng)最終就死-掉-了。
????同步阻塞式I/O創(chuàng)建的Server源碼:
package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
?* BIO服務(wù)端源碼
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public final class ServerNormal {
?? ?//默認(rèn)的端口號
?? ?private static int DEFAULT_PORT = 12345;
?? ?//單例的ServerSocket
?? ?private static ServerSocket server;
?? ?//根據(jù)傳入?yún)?shù)設(shè)置監(jiān)聽端口,如果沒有參數(shù)調(diào)用以下方法并使用默認(rèn)值
?? ?public static void start() throws IOException{
?? ??? ?//使用默認(rèn)值
?? ??? ?start(DEFAULT_PORT);
?? ?}
?? ?//這個方法不會被大量并發(fā)訪問,不太需要考慮效率,直接進(jìn)行方法同步就行了
?? ?public synchronized static void start(int port) throws IOException{
?? ??? ?if(server != null) return;
?? ??? ?try{
?? ??? ??? ?//通過構(gòu)造函數(shù)創(chuàng)建ServerSocket
?? ??? ??? ?//如果端口合法且空閑,服務(wù)端就監(jiān)聽成功
?? ??? ??? ?server = new ServerSocket(port);
?? ??? ??? ?System.out.println("服務(wù)器已啟動,端口號:" + port);
?? ??? ??? ?//通過無線循環(huán)監(jiān)聽客戶端連接
?? ??? ??? ?//如果沒有客戶端接入,將阻塞在accept操作上。
?? ??? ??? ?while(true){
?? ??? ??? ??? ?Socket socket = server.accept();
?? ??? ??? ??? ?//當(dāng)有新的客戶端接入時,會執(zhí)行下面的代碼
?? ??? ??? ??? ?//然后創(chuàng)建一個新的線程處理這條Socket鏈路
?? ??? ??? ??? ?new Thread(new ServerHandler(socket)).start();
?? ??? ??? ?}
?? ??? ?}finally{
?? ??? ??? ?//一些必要的清理工作
?? ??? ??? ?if(server != null){
?? ??? ??? ??? ?System.out.println("服務(wù)器已關(guān)閉。");
?? ??? ??? ??? ?server.close();
?? ??? ??? ??? ?server = null;
?? ??? ??? ?}
?? ??? ?}
?? ?}
}
????客戶端消息處理線程ServerHandler源碼:
package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
?
import com.anxpp.io.utils.Calculator;
/**
?* 客戶端線程
?* @author yangtao__anxpp.com
?* 用于處理一個客戶端的Socket鏈路
?*/
public class ServerHandler implements Runnable{
?? ?private Socket socket;
?? ?public ServerHandler(Socket socket) {
?? ??? ?this.socket = socket;
?? ?}
?? ?@Override
?? ?public void run() {
?? ??? ?BufferedReader in = null;
?? ??? ?PrintWriter out = null;
?? ??? ?try{
?? ??? ??? ?in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
?? ??? ??? ?out = new PrintWriter(socket.getOutputStream(),true);
?? ??? ??? ?String expression;
?? ??? ??? ?String result;
?? ??? ??? ?while(true){
?? ??? ??? ??? ?//通過BufferedReader讀取一行
?? ??? ??? ??? ?//如果已經(jīng)讀到輸入流尾部,返回null,退出循環(huán)
?? ??? ??? ??? ?//如果得到非空值,就嘗試計算結(jié)果并返回
?? ??? ??? ??? ?if((expression = in.readLine())==null) break;
?? ??? ??? ??? ?System.out.println("服務(wù)器收到消息:" + expression);
?? ??? ??? ??? ?try{
?? ??? ??? ??? ??? ?result = Calculator.cal(expression).toString();
?? ??? ??? ??? ?}catch(Exception e){
?? ??? ??? ??? ??? ?result = "計算錯誤:" + e.getMessage();
?? ??? ??? ??? ?}
?? ??? ??? ??? ?out.println(result);
?? ??? ??? ?}
?? ??? ?}catch(Exception e){
?? ??? ??? ?e.printStackTrace();
?? ??? ?}finally{
?? ??? ??? ?//一些必要的清理工作
?? ??? ??? ?if(in != null){
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?in.close();
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}
?? ??? ??? ??? ?in = null;
?? ??? ??? ?}
?? ??? ??? ?if(out != null){
?? ??? ??? ??? ?out.close();
?? ??? ??? ??? ?out = null;
?? ??? ??? ?}
?? ??? ??? ?if(socket != null){
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?socket.close();
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}
?? ??? ??? ??? ?socket = null;
?? ??? ??? ?}
?? ??? ?}
?? ?}
}
????同步阻塞式I/O創(chuàng)建的Client源碼:
package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
?* 阻塞式I/O創(chuàng)建的客戶端
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class Client {
?? ?//默認(rèn)的端口號
?? ?private static int DEFAULT_SERVER_PORT = 12345;
?? ?private static String DEFAULT_SERVER_IP = "127.0.0.1";
?? ?public static void send(String expression){
?? ??? ?send(DEFAULT_SERVER_PORT,expression);
?? ?}
?? ?public static void send(int port,String expression){
?? ??? ?System.out.println("算術(shù)表達(dá)式為:" + expression);
?? ??? ?Socket socket = null;
?? ??? ?BufferedReader in = null;
?? ??? ?PrintWriter out = null;
?? ??? ?try{
?? ??? ??? ?socket = new Socket(DEFAULT_SERVER_IP,port);
?? ??? ??? ?in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
?? ??? ??? ?out = new PrintWriter(socket.getOutputStream(),true);
?? ??? ??? ?out.println(expression);
?? ??? ??? ?System.out.println("___結(jié)果為:" + in.readLine());
?? ??? ?}catch(Exception e){
?? ??? ??? ?e.printStackTrace();
?? ??? ?}finally{
?? ??? ??? ?//一下必要的清理工作
?? ??? ??? ?if(in != null){
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?in.close();
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}
?? ??? ??? ??? ?in = null;
?? ??? ??? ?}
?? ??? ??? ?if(out != null){
?? ??? ??? ??? ?out.close();
?? ??? ??? ??? ?out = null;
?? ??? ??? ?}
?? ??? ??? ?if(socket != null){
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?socket.close();
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}
?? ??? ??? ??? ?socket = null;
?? ??? ??? ?}
?? ??? ?}
?? ?}
}
????測試代碼,為了方便在控制臺看輸出結(jié)果,放到同一個程序(jvm)中運(yùn)行:
package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/**
?* 測試方法
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class Test {
?? ?//測試主方法
?? ?public static void main(String[] args) throws InterruptedException {
?? ??? ?//運(yùn)行服務(wù)器
?? ??? ?new Thread(new Runnable() {
?? ??? ??? ?@Override
?? ??? ??? ?public void run() {
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?ServerBetter.start();
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ?}).start();
?? ??? ?//避免客戶端先于服務(wù)器啟動前執(zhí)行代碼
?? ??? ?Thread.sleep(100);
?? ??? ?//運(yùn)行客戶端?
?? ??? ?char operators[] = {'+','-','*','/'};
?? ??? ?Random random = new Random(System.currentTimeMillis());
?? ??? ?new Thread(new Runnable() {
?? ??? ??? ?@SuppressWarnings("static-access")
?? ??? ??? ?@Override
?? ??? ??? ?public void run() {
?? ??? ??? ??? ?while(true){
?? ??? ??? ??? ??? ?//隨機(jī)產(chǎn)生算術(shù)表達(dá)式
?? ??? ??? ??? ??? ?String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
?? ??? ??? ??? ??? ?Client.send(expression);
?? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ?Thread.currentThread().sleep(random.nextInt(1000));
?? ??? ??? ??? ??? ?} catch (InterruptedException e) {
?? ??? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ?}).start();
?? ?}
}
????其中一次的運(yùn)行結(jié)果:
服務(wù)器已啟動,端口號:12345
算術(shù)表達(dá)式為:4-2
服務(wù)器收到消息:4-2
___結(jié)果為:2
算術(shù)表達(dá)式為:5-10
服務(wù)器收到消息:5-10
___結(jié)果為:-5
算術(shù)表達(dá)式為:0-9
服務(wù)器收到消息:0-9
___結(jié)果為:-9
算術(shù)表達(dá)式為:0+6
服務(wù)器收到消息:0+6
___結(jié)果為:6
算術(shù)表達(dá)式為:1/6
服務(wù)器收到消息:1/6
___結(jié)果為:0.16666666666666666
...
????從以上代碼,很容易看出,BIO主要的問題在于每當(dāng)有一個新的客戶端請求接入時,服務(wù)端必須創(chuàng)建一個新的線程來處理這條鏈路,在需要滿足高性能、高并發(fā)的場景是沒法應(yīng)用的(大量創(chuàng)建新的線程會嚴(yán)重影響服務(wù)器性能,甚至罷工)。
????1.2、偽異步I/O編程
????為了改進(jìn)這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請參考前面提供的文章),實(shí)現(xiàn)1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。
????偽異步I/O模型圖:
????
????實(shí)現(xiàn)很簡單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動剛剛的Server代碼即可:
package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
?* BIO服務(wù)端源碼__偽異步I/O
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public final class ServerBetter {
?? ?//默認(rèn)的端口號
?? ?private static int DEFAULT_PORT = 12345;
?? ?//單例的ServerSocket
?? ?private static ServerSocket server;
?? ?//線程池 懶漢式的單例
?? ?private static ExecutorService executorService = Executors.newFixedThreadPool(60);
?? ?//根據(jù)傳入?yún)?shù)設(shè)置監(jiān)聽端口,如果沒有參數(shù)調(diào)用以下方法并使用默認(rèn)值
?? ?public static void start() throws IOException{
?? ??? ?//使用默認(rèn)值
?? ??? ?start(DEFAULT_PORT);
?? ?}
?? ?//這個方法不會被大量并發(fā)訪問,不太需要考慮效率,直接進(jìn)行方法同步就行了
?? ?public synchronized static void start(int port) throws IOException{
?? ??? ?if(server != null) return;
?? ??? ?try{
?? ??? ??? ?//通過構(gòu)造函數(shù)創(chuàng)建ServerSocket
?? ??? ??? ?//如果端口合法且空閑,服務(wù)端就監(jiān)聽成功
?? ??? ??? ?server = new ServerSocket(port);
?? ??? ??? ?System.out.println("服務(wù)器已啟動,端口號:" + port);
?? ??? ??? ?//通過無線循環(huán)監(jiān)聽客戶端連接
?? ??? ??? ?//如果沒有客戶端接入,將阻塞在accept操作上。
?? ??? ??? ?while(true){
?? ??? ??? ??? ?Socket socket = server.accept();
?? ??? ??? ??? ?//當(dāng)有新的客戶端接入時,會執(zhí)行下面的代碼
?? ??? ??? ??? ?//然后創(chuàng)建一個新的線程處理這條Socket鏈路
?? ??? ??? ??? ?executorService.execute(new ServerHandler(socket));
?? ??? ??? ?}
?? ??? ?}finally{
?? ??? ??? ?//一些必要的清理工作
?? ??? ??? ?if(server != null){
?? ??? ??? ??? ?System.out.println("服務(wù)器已關(guān)閉。");
?? ??? ??? ??? ?server.close();
?? ??? ??? ??? ?server = null;
?? ??? ??? ?}
?? ??? ?}
?? ?}
}
????測試運(yùn)行結(jié)果是一樣的。
????我們知道,如果使用CachedThreadPool線程池(不限制線程數(shù)量,如果不清楚請參考文首提供的文章),其實(shí)除了能自動幫我們管理線程(復(fù)用),看起來也就像是1:1的客戶端:線程數(shù)模型,而使用FixedThreadPool我們就有效的控制了線程的最大數(shù)量,保證了系統(tǒng)有限的資源的控制,實(shí)現(xiàn)了N:M的偽異步I/O模型。
????但是,正因?yàn)橄拗屏司€程數(shù)量,如果發(fā)生大量并發(fā)請求,超過最大數(shù)量的線程就只能等待,直到線程池中的有空閑的線程可以被復(fù)用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發(fā)生:
????有數(shù)據(jù)可讀
????可用數(shù)據(jù)以及讀取完畢
????發(fā)生空指針或I/O異常
????所以在讀取數(shù)據(jù)較慢時(比如數(shù)據(jù)量大、網(wǎng)絡(luò)傳輸慢等),大量并發(fā)的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。
????而后面即將介紹的NIO,就能解決這個難題。
2、NIO 編程
????JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實(shí)際上,“舊”的I/O包已經(jīng)使用NIO重新實(shí)現(xiàn)過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網(wǎng)絡(luò)I/O中都可能會發(fā)生,但本文只討論后者。
????2.1、簡介
????NIO我們一般認(rèn)為是New I/O(也是官方的叫法),因?yàn)樗窍鄬τ诶系腎/O類庫新增的(其實(shí)在JDK 1.4中就已經(jīng)被引入了,但這個名詞還會繼續(xù)用很久,即使它們在現(xiàn)在看來已經(jīng)是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因?yàn)檫@樣叫,更能體現(xiàn)它的特點(diǎn)。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。
????NIO提供了與傳統(tǒng)BIO模型中的Socket和ServerSocket相對應(yīng)的SocketChannel和ServerSocketChannel兩種不同的套接字通道實(shí)現(xiàn)。
????新增的著兩種通道都支持阻塞和非阻塞兩種模式。
????阻塞模式使用就像傳統(tǒng)中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。
????對于低負(fù)載、低并發(fā)的應(yīng)用程序,可以使用同步阻塞I/O來提升開發(fā)速率和更好的維護(hù)性;對于高負(fù)載、高并發(fā)的(網(wǎng)絡(luò))應(yīng)用,應(yīng)使用NIO的非阻塞模式來開發(fā)。
????下面會先對基礎(chǔ)知識進(jìn)行介紹。
????2.2、緩沖區(qū) Buffer
????Buffer是一個對象,包含一些要寫入或者讀出的數(shù)據(jù)。
????在NIO庫中,所有數(shù)據(jù)都是用緩沖區(qū)處理的。在讀取數(shù)據(jù)時,它是直接讀到緩沖區(qū)中的;在寫入數(shù)據(jù)時,也是寫入到緩沖區(qū)中。任何時候訪問NIO中的數(shù)據(jù),都是通過緩沖區(qū)進(jìn)行操作。
????緩沖區(qū)實(shí)際上是一個數(shù)組,并提供了對數(shù)據(jù)結(jié)構(gòu)化訪問以及維護(hù)讀寫位置等信息。
????具體的緩存區(qū)有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實(shí)現(xiàn)了相同的接口:Buffer。
????2.3、通道 Channel
????我們對數(shù)據(jù)的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同于流的地方就是通道是雙向的,可以用于讀、寫和同時讀寫操作。
????底層的操作系統(tǒng)的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統(tǒng)的API。
????Channel主要分兩大類:
????SelectableChannel:用戶網(wǎng)絡(luò)讀寫
????FileChannel:用于文件操作
????后面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。
????2.4、多路復(fù)用器 Selector
????Selector是Java ?NIO 編程的基礎(chǔ)。
????Selector提供選擇已經(jīng)就緒的任務(wù)的能力:Selector會不斷輪詢注冊在其上的Channel,如果某個Channel上面發(fā)生讀或者寫事件,這個Channel就處于就緒狀態(tài),會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進(jìn)行后續(xù)的I/O操作。
????一個Selector可以同時輪詢多個Channel,因?yàn)镴DK使用了epoll()代替?zhèn)鹘y(tǒng)的select實(shí)現(xiàn),所以沒有最大連接句柄1024/2048的限制。所以,只需要一個線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬的客戶端。
????2.5、NIO服務(wù)端
????代碼比傳統(tǒng)的Socket編程看起來要復(fù)雜不少。
????直接貼代碼吧,以注釋的形式給出代碼說明。
????NIO創(chuàng)建的Server源碼:
package com.anxpp.io.calculator.nio;
public class Server {
?? ?private static int DEFAULT_PORT = 12345;
?? ?private static ServerHandle serverHandle;
?? ?public static void start(){
?? ??? ?start(DEFAULT_PORT);
?? ?}
?? ?public static synchronized void start(int port){
?? ??? ?if(serverHandle!=null)
?? ??? ??? ?serverHandle.stop();
?? ??? ?serverHandle = new ServerHandle(port);
?? ??? ?new Thread(serverHandle,"Server").start();
?? ?}
?? ?public static void main(String[] args){
?? ??? ?start();
?? ?}
}
????ServerHandle:
package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
?
import com.anxpp.io.utils.Calculator;
/**
?* NIO服務(wù)端
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class ServerHandle implements Runnable{
?? ?private Selector selector;
?? ?private ServerSocketChannel serverChannel;
?? ?private volatile boolean started;
?? ?/**
?? ? * 構(gòu)造方法
?? ? * @param port 指定要監(jiān)聽的端口號
?? ? */
?? ?public ServerHandle(int port) {
?? ??? ?try{
?? ??? ??? ?//創(chuàng)建選擇器
?? ??? ??? ?selector = Selector.open();
?? ??? ??? ?//打開監(jiān)聽通道
?? ??? ??? ?serverChannel = ServerSocketChannel.open();
?? ??? ??? ?//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式
?? ??? ??? ?serverChannel.configureBlocking(false);//開啟非阻塞模式
?? ??? ??? ?//綁定端口 backlog設(shè)為1024
?? ??? ??? ?serverChannel.socket().bind(new InetSocketAddress(port),1024);
?? ??? ??? ?//監(jiān)聽客戶端連接請求
?? ??? ??? ?serverChannel.register(selector, SelectionKey.OP_ACCEPT);
?? ??? ??? ?//標(biāo)記服務(wù)器已開啟
?? ??? ??? ?started = true;
?? ??? ??? ?System.out.println("服務(wù)器已啟動,端口號:" + port);
?? ??? ?}catch(IOException e){
?? ??? ??? ?e.printStackTrace();
?? ??? ??? ?System.exit(1);
?? ??? ?}
?? ?}
?? ?public void stop(){
?? ??? ?started = false;
?? ?}
?? ?@Override
?? ?public void run() {
?? ??? ?//循環(huán)遍歷selector
?? ??? ?while(started){
?? ??? ??? ?try{
?? ??? ??? ??? ?//無論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次
?? ??? ??? ??? ?selector.select(1000);
?? ??? ??? ??? ?//阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù).
//?? ??? ??? ??? ?selector.select();
?? ??? ??? ??? ?Set<SelectionKey> keys = selector.selectedKeys();
?? ??? ??? ??? ?Iterator<SelectionKey> it = keys.iterator();
?? ??? ??? ??? ?SelectionKey key = null;
?? ??? ??? ??? ?while(it.hasNext()){
?? ??? ??? ??? ??? ?key = it.next();
?? ??? ??? ??? ??? ?it.remove();
?? ??? ??? ??? ??? ?try{
?? ??? ??? ??? ??? ??? ?handleInput(key);
?? ??? ??? ??? ??? ?}catch(Exception e){
?? ??? ??? ??? ??? ??? ?if(key != null){
?? ??? ??? ??? ??? ??? ??? ?key.cancel();
?? ??? ??? ??? ??? ??? ??? ?if(key.channel() != null){
?? ??? ??? ??? ??? ??? ??? ??? ?key.channel().close();
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?}
?? ??? ??? ?}catch(Throwable t){
?? ??? ??? ??? ?t.printStackTrace();
?? ??? ??? ?}
?? ??? ?}
?? ??? ?//selector關(guān)閉后會自動釋放里面管理的資源
?? ??? ?if(selector != null)
?? ??? ??? ?try{
?? ??? ??? ??? ?selector.close();
?? ??? ??? ?}catch (Exception e) {
?? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ?}
?? ?}
?? ?private void handleInput(SelectionKey key) throws IOException{
?? ??? ?if(key.isValid()){
?? ??? ??? ?//處理新接入的請求消息
?? ??? ??? ?if(key.isAcceptable()){
?? ??? ??? ??? ?ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
?? ??? ??? ??? ?//通過ServerSocketChannel的accept創(chuàng)建SocketChannel實(shí)例
?? ??? ??? ??? ?//完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
?? ??? ??? ??? ?SocketChannel sc = ssc.accept();
?? ??? ??? ??? ?//設(shè)置為非阻塞的
?? ??? ??? ??? ?sc.configureBlocking(false);
?? ??? ??? ??? ?//注冊為讀
?? ??? ??? ??? ?sc.register(selector, SelectionKey.OP_READ);
?? ??? ??? ?}
?? ??? ??? ?//讀消息
?? ??? ??? ?if(key.isReadable()){
?? ??? ??? ??? ?SocketChannel sc = (SocketChannel) key.channel();
?? ??? ??? ??? ?//創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū)
?? ??? ??? ??? ?ByteBuffer buffer = ByteBuffer.allocate(1024);
?? ??? ??? ??? ?//讀取請求碼流,返回讀取到的字節(jié)數(shù)
?? ??? ??? ??? ?int readBytes = sc.read(buffer);
?? ??? ??? ??? ?//讀取到字節(jié),對字節(jié)進(jìn)行編解碼
?? ??? ??? ??? ?if(readBytes>0){
?? ??? ??? ??? ??? ?//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作
?? ??? ??? ??? ??? ?buffer.flip();
?? ??? ??? ??? ??? ?//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
?? ??? ??? ??? ??? ?byte[] bytes = new byte[buffer.remaining()];
?? ??? ??? ??? ??? ?//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
?? ??? ??? ??? ??? ?buffer.get(bytes);
?? ??? ??? ??? ??? ?String expression = new String(bytes,"UTF-8");
?? ??? ??? ??? ??? ?System.out.println("服務(wù)器收到消息:" + expression);
?? ??? ??? ??? ??? ?//處理數(shù)據(jù)
?? ??? ??? ??? ??? ?String result = null;
?? ??? ??? ??? ??? ?try{
?? ??? ??? ??? ??? ??? ?result = Calculator.cal(expression).toString();
?? ??? ??? ??? ??? ?}catch(Exception e){
?? ??? ??? ??? ??? ??? ?result = "計算錯誤:" + e.getMessage();
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?//發(fā)送應(yīng)答消息
?? ??? ??? ??? ??? ?doWrite(sc,result);
?? ??? ??? ??? ?}
?? ??? ??? ??? ?//沒有讀取到字節(jié) 忽略
//?? ??? ??? ??? ?else if(readBytes==0);
?? ??? ??? ??? ?//鏈路已經(jīng)關(guān)閉,釋放資源
?? ??? ??? ??? ?else if(readBytes<0){
?? ??? ??? ??? ??? ?key.cancel();
?? ??? ??? ??? ??? ?sc.close();
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ?}
?? ?}
?? ?//異步發(fā)送應(yīng)答消息
?? ?private void doWrite(SocketChannel channel,String response) throws IOException{
?? ??? ?//將消息編碼為字節(jié)數(shù)組
?? ??? ?byte[] bytes = response.getBytes();
?? ??? ?//根據(jù)數(shù)組容量創(chuàng)建ByteBuffer
?? ??? ?ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
?? ??? ?//將字節(jié)數(shù)組復(fù)制到緩沖區(qū)
?? ??? ?writeBuffer.put(bytes);
?? ??? ?//flip操作
?? ??? ?writeBuffer.flip();
?? ??? ?//發(fā)送緩沖區(qū)的字節(jié)數(shù)組
?? ??? ?channel.write(writeBuffer);
?? ??? ?//****此處不含處理“寫半包”的代碼
?? ?}
}
????可以看到,創(chuàng)建NIO服務(wù)端的主要步驟如下:
????打開ServerSocketChannel,監(jiān)聽客戶端連接
????綁定監(jiān)聽端口,設(shè)置連接為非阻塞模式
????創(chuàng)建Reactor線程,創(chuàng)建多路復(fù)用器并啟動線程
????將ServerSocketChannel注冊到Reactor線程中的Selector上,監(jiān)聽ACCEPT事件
????Selector輪詢準(zhǔn)備就緒的key
????Selector監(jiān)聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
????設(shè)置客戶端鏈路為非阻塞模式
????將新接入的客戶端連接注冊到Reactor線程的Selector上,監(jiān)聽讀操作,讀取客戶端發(fā)送的網(wǎng)絡(luò)消息
????異步讀取客戶端消息到緩沖區(qū)
????對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task
????將應(yīng)答消息編碼為Buffer,調(diào)用SocketChannel的write將消息異步發(fā)送給客戶端
????因?yàn)閼?yīng)答消息的發(fā)送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發(fā)送的數(shù)據(jù)發(fā)送完,此時就會出現(xiàn)寫半包的問題。我們需要注冊寫操作,不斷輪詢Selector將沒有發(fā)送完的消息發(fā)送完畢,然后通過Buffer的hasRemain()方法判斷消息是否發(fā)送完成。
????2.6、NIO客戶端
????還是直接上代碼吧,過程也不需要太多解釋了,跟服務(wù)端代碼有點(diǎn)類似。
????Client:
package com.anxpp.io.calculator.nio;
public class Client {
?? ?private static String DEFAULT_HOST = "127.0.0.1";
?? ?private static int DEFAULT_PORT = 12345;
?? ?private static ClientHandle clientHandle;
?? ?public static void start(){
?? ??? ?start(DEFAULT_HOST,DEFAULT_PORT);
?? ?}
?? ?public static synchronized void start(String ip,int port){
?? ??? ?if(clientHandle!=null)
?? ??? ??? ?clientHandle.stop();
?? ??? ?clientHandle = new ClientHandle(ip,port);
?? ??? ?new Thread(clientHandle,"Server").start();
?? ?}
?? ?//向服務(wù)器發(fā)送消息
?? ?public static boolean sendMsg(String msg) throws Exception{
?? ??? ?if(msg.equals("q")) return false;
?? ??? ?clientHandle.sendMsg(msg);
?? ??? ?return true;
?? ?}
?? ?public static void main(String[] args){
?? ??? ?start();
?? ?}
}
????ClientHandle:
package com.anxpp.io.calculator.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
?* NIO客戶端
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class ClientHandle implements Runnable{
?? ?private String host;
?? ?private int port;
?? ?private Selector selector;
?? ?private SocketChannel socketChannel;
?? ?private volatile boolean started;
?
?? ?public ClientHandle(String ip,int port) {
?? ??? ?this.host = ip;
?? ??? ?this.port = port;
?? ??? ?try{
?? ??? ??? ?//創(chuàng)建選擇器
?? ??? ??? ?selector = Selector.open();
?? ??? ??? ?//打開監(jiān)聽通道
?? ??? ??? ?socketChannel = SocketChannel.open();
?? ??? ??? ?//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式
?? ??? ??? ?socketChannel.configureBlocking(false);//開啟非阻塞模式
?? ??? ??? ?started = true;
?? ??? ?}catch(IOException e){
?? ??? ??? ?e.printStackTrace();
?? ??? ??? ?System.exit(1);
?? ??? ?}
?? ?}
?? ?public void stop(){
?? ??? ?started = false;
?? ?}
?? ?@Override
?? ?public void run() {
?? ??? ?try{
?? ??? ??? ?doConnect();
?? ??? ?}catch(IOException e){
?? ??? ??? ?e.printStackTrace();
?? ??? ??? ?System.exit(1);
?? ??? ?}
?? ??? ?//循環(huán)遍歷selector
?? ??? ?while(started){
?? ??? ??? ?try{
?? ??? ??? ??? ?//無論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次
?? ??? ??? ??? ?selector.select(1000);
?? ??? ??? ??? ?//阻塞,只有當(dāng)至少一個注冊的事件發(fā)生的時候才會繼續(xù).
//?? ??? ??? ??? ?selector.select();
?? ??? ??? ??? ?Set<SelectionKey> keys = selector.selectedKeys();
?? ??? ??? ??? ?Iterator<SelectionKey> it = keys.iterator();
?? ??? ??? ??? ?SelectionKey key = null;
?? ??? ??? ??? ?while(it.hasNext()){
?? ??? ??? ??? ??? ?key = it.next();
?? ??? ??? ??? ??? ?it.remove();
?? ??? ??? ??? ??? ?try{
?? ??? ??? ??? ??? ??? ?handleInput(key);
?? ??? ??? ??? ??? ?}catch(Exception e){
?? ??? ??? ??? ??? ??? ?if(key != null){
?? ??? ??? ??? ??? ??? ??? ?key.cancel();
?? ??? ??? ??? ??? ??? ??? ?if(key.channel() != null){
?? ??? ??? ??? ??? ??? ??? ??? ?key.channel().close();
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?}
?? ??? ??? ?}catch(Exception e){
?? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?System.exit(1);
?? ??? ??? ?}
?? ??? ?}
?? ??? ?//selector關(guān)閉后會自動釋放里面管理的資源
?? ??? ?if(selector != null)
?? ??? ??? ?try{
?? ??? ??? ??? ?selector.close();
?? ??? ??? ?}catch (Exception e) {
?? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ?}
?? ?}
?? ?private void handleInput(SelectionKey key) throws IOException{
?? ??? ?if(key.isValid()){
?? ??? ??? ?SocketChannel sc = (SocketChannel) key.channel();
?? ??? ??? ?if(key.isConnectable()){
?? ??? ??? ??? ?if(sc.finishConnect());
?? ??? ??? ??? ?else System.exit(1);
?? ??? ??? ?}
?? ??? ??? ?//讀消息
?? ??? ??? ?if(key.isReadable()){
?? ??? ??? ??? ?//創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū)
?? ??? ??? ??? ?ByteBuffer buffer = ByteBuffer.allocate(1024);
?? ??? ??? ??? ?//讀取請求碼流,返回讀取到的字節(jié)數(shù)
?? ??? ??? ??? ?int readBytes = sc.read(buffer);
?? ??? ??? ??? ?//讀取到字節(jié),對字節(jié)進(jìn)行編解碼
?? ??? ??? ??? ?if(readBytes>0){
?? ??? ??? ??? ??? ?//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作
?? ??? ??? ??? ??? ?buffer.flip();
?? ??? ??? ??? ??? ?//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
?? ??? ??? ??? ??? ?byte[] bytes = new byte[buffer.remaining()];
?? ??? ??? ??? ??? ?//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
?? ??? ??? ??? ??? ?buffer.get(bytes);
?? ??? ??? ??? ??? ?String result = new String(bytes,"UTF-8");
?? ??? ??? ??? ??? ?System.out.println("客戶端收到消息:" + result);
?? ??? ??? ??? ?}
?? ??? ??? ??? ?//沒有讀取到字節(jié) 忽略
//?? ??? ??? ??? ?else if(readBytes==0);
?? ??? ??? ??? ?//鏈路已經(jīng)關(guān)閉,釋放資源
?? ??? ??? ??? ?else if(readBytes<0){
?? ??? ??? ??? ??? ?key.cancel();
?? ??? ??? ??? ??? ?sc.close();
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ?}
?? ?}
?? ?//異步發(fā)送消息
?? ?private void doWrite(SocketChannel channel,String request) throws IOException{
?? ??? ?//將消息編碼為字節(jié)數(shù)組
?? ??? ?byte[] bytes = request.getBytes();
?? ??? ?//根據(jù)數(shù)組容量創(chuàng)建ByteBuffer
?? ??? ?ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
?? ??? ?//將字節(jié)數(shù)組復(fù)制到緩沖區(qū)
?? ??? ?writeBuffer.put(bytes);
?? ??? ?//flip操作
?? ??? ?writeBuffer.flip();
?? ??? ?//發(fā)送緩沖區(qū)的字節(jié)數(shù)組
?? ??? ?channel.write(writeBuffer);
?? ??? ?//****此處不含處理“寫半包”的代碼
?? ?}
?? ?private void doConnect() throws IOException{
?? ??? ?if(socketChannel.connect(new InetSocketAddress(host,port)));
?? ??? ?else socketChannel.register(selector, SelectionKey.OP_CONNECT);
?? ?}
?? ?public void sendMsg(String msg) throws Exception{
?? ??? ?socketChannel.register(selector, SelectionKey.OP_READ);
?? ??? ?doWrite(socketChannel, msg);
?? ?}
}
????2.7、演示結(jié)果
????首先運(yùn)行服務(wù)器,順便也運(yùn)行一個客戶端:
package com.anxpp.io.calculator.nio;
import java.util.Scanner;
/**
?* 測試方法
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class Test {
?? ?//測試主方法
?? ?@SuppressWarnings("resource")
?? ?public static void main(String[] args) throws Exception{
?? ??? ?//運(yùn)行服務(wù)器
?? ??? ?Server.start();
?? ??? ?//避免客戶端先于服務(wù)器啟動前執(zhí)行代碼
?? ??? ?Thread.sleep(100);
?? ??? ?//運(yùn)行客戶端?
?? ??? ?Client.start();
?? ??? ?while(Client.sendMsg(new Scanner(System.in).nextLine()));
?? ?}
}
????我們也可以單獨(dú)運(yùn)行客戶端,效果都是一樣的。
????一次測試的結(jié)果:
服務(wù)器已啟動,端口號:12345
1+2+3+4+5+6
服務(wù)器收到消息:1+2+3+4+5+6
客戶端收到消息:21
1*2/3-4+5*6/7-8
服務(wù)器收到消息:1*2/3-4+5*6/7-8
客戶端收到消息:-7.0476190476190474
????運(yùn)行多個客戶端,都是沒有問題的。
3、AIO編程
????NIO 2.0引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn)。
????異步的套接字通道時真正的異步非阻塞I/O,對應(yīng)于UNIX網(wǎng)絡(luò)編程中的事件驅(qū)動I/O(AIO)。他不需要過多的Selector對注冊的通道進(jìn)行輪詢即可實(shí)現(xiàn)異步讀寫,從而簡化了NIO的編程模型。
????直接上代碼吧。
????3.1、Server端代碼
????Server:
package com.anxpp.io.calculator.aio.server;
/**
?* AIO服務(wù)端
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class Server {
?? ?private static int DEFAULT_PORT = 12345;
?? ?private static AsyncServerHandler serverHandle;
?? ?public volatile static long clientCount = 0;
?? ?public static void start(){
?? ??? ?start(DEFAULT_PORT);
?? ?}
?? ?public static synchronized void start(int port){
?? ??? ?if(serverHandle!=null)
?? ??? ??? ?return;
?? ??? ?serverHandle = new AsyncServerHandler(port);
?? ??? ?new Thread(serverHandle,"Server").start();
?? ?}
?? ?public static void main(String[] args){
?? ??? ?Server.start();
?? ?}
}
????AsyncServerHandler:
package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AsyncServerHandler implements Runnable {
?? ?public CountDownLatch latch;
?? ?public AsynchronousServerSocketChannel channel;
?? ?public AsyncServerHandler(int port) {
?? ??? ?try {
?? ??? ??? ?//創(chuàng)建服務(wù)端通道
?? ??? ??? ?channel = AsynchronousServerSocketChannel.open();
?? ??? ??? ?//綁定端口
?? ??? ??? ?channel.bind(new InetSocketAddress(port));
?? ??? ??? ?System.out.println("服務(wù)器已啟動,端口號:" + port);
?? ??? ?} catch (IOException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?? ?@Override
?? ?public void run() {
?? ??? ?//CountDownLatch初始化
?? ??? ?//它的作用:在完成一組正在執(zhí)行的操作之前,允許當(dāng)前的現(xiàn)場一直阻塞
?? ??? ?//此處,讓現(xiàn)場在此阻塞,防止服務(wù)端執(zhí)行完成后退出
?? ??? ?//也可以使用while(true)+sleep?
?? ??? ?//生成環(huán)境就不需要擔(dān)心這個問題,以為服務(wù)端是不會退出的
?? ??? ?latch = new CountDownLatch(1);
?? ??? ?//用于接收客戶端的連接
?? ??? ?channel.accept(this,new AcceptHandler());
?? ??? ?try {
?? ??? ??? ?latch.await();
?? ??? ?} catch (InterruptedException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
}
????AcceptHandler:
package com.anxpp.io.calculator.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
?? ?@Override
?? ?public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
?? ??? ?//繼續(xù)接受其他客戶端的請求
?? ??? ?Server.clientCount++;
?? ??? ?System.out.println("連接的客戶端數(shù):" + Server.clientCount);
?? ??? ?serverHandler.channel.accept(serverHandler, this);
?? ??? ?//創(chuàng)建新的Buffer
?? ??? ?ByteBuffer buffer = ByteBuffer.allocate(1024);
?? ??? ?//異步讀 ?第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler
?? ??? ?channel.read(buffer, buffer, new ReadHandler(channel));
?? ?}
?? ?@Override
?? ?public void failed(Throwable exc, AsyncServerHandler serverHandler) {
?? ??? ?exc.printStackTrace();
?? ??? ?serverHandler.latch.countDown();
?? ?}
}
????ReadHandler:
package com.anxpp.io.calculator.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.anxpp.io.utils.Calculator;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
?? ?//用于讀取半包消息和發(fā)送應(yīng)答
?? ?private AsynchronousSocketChannel channel;
?? ?public ReadHandler(AsynchronousSocketChannel channel) {
?? ??? ??? ?this.channel = channel;
?? ?}
?? ?//讀取到消息后的處理
?? ?@Override
?? ?public void completed(Integer result, ByteBuffer attachment) {
?? ??? ?//flip操作
?? ??? ?attachment.flip();
?? ??? ?//根據(jù)
?? ??? ?byte[] message = new byte[attachment.remaining()];
?? ??? ?attachment.get(message);
?? ??? ?try {
?? ??? ??? ?String expression = new String(message, "UTF-8");
?? ??? ??? ?System.out.println("服務(wù)器收到消息: " + expression);
?? ??? ??? ?String calrResult = null;
?? ??? ??? ?try{
?? ??? ??? ??? ?calrResult = Calculator.cal(expression).toString();
?? ??? ??? ?}catch(Exception e){
?? ??? ??? ??? ?calrResult = "計算錯誤:" + e.getMessage();
?? ??? ??? ?}
?? ??? ??? ?//向客戶端發(fā)送消息
?? ??? ??? ?doWrite(calrResult);
?? ??? ?} catch (UnsupportedEncodingException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?? ?//發(fā)送消息
?? ?private void doWrite(String result) {
?? ??? ?byte[] bytes = result.getBytes();
?? ??? ?ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
?? ??? ?writeBuffer.put(bytes);
?? ??? ?writeBuffer.flip();
?? ??? ?//異步寫數(shù)據(jù) 參數(shù)與前面的read一樣
?? ??? ?channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
?? ??? ??? ?@Override
?? ??? ??? ?public void completed(Integer result, ByteBuffer buffer) {
?? ??? ??? ??? ?//如果沒有發(fā)送完,就繼續(xù)發(fā)送直到完成
?? ??? ??? ??? ?if (buffer.hasRemaining())
?? ??? ??? ??? ??? ?channel.write(buffer, buffer, this);
?? ??? ??? ??? ?else{
?? ??? ??? ??? ??? ?//創(chuàng)建新的Buffer
?? ??? ??? ??? ??? ?ByteBuffer readBuffer = ByteBuffer.allocate(1024);
?? ??? ??? ??? ??? ?//異步讀 ?第三個參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler
?? ??? ??? ??? ??? ?channel.read(readBuffer, readBuffer, new ReadHandler(channel));
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ??? ?@Override
?? ??? ??? ?public void failed(Throwable exc, ByteBuffer attachment) {
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?channel.close();
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ?});
?? ?}
?? ?@Override
?? ?public void failed(Throwable exc, ByteBuffer attachment) {
?? ??? ?try {
?? ??? ??? ?this.channel.close();
?? ??? ?} catch (IOException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
}
????OK,這樣就已經(jīng)完成了,其實(shí)說起來也簡單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監(jiān)聽、讀、寫等各種CompletionHandler。此處本應(yīng)有一個WriteHandler的,確實(shí),我們在ReadHandler中,以一個匿名內(nèi)部類實(shí)現(xiàn)了它。
????下面看客戶端代碼。
????3.2、Client端代碼
????Client:
package com.anxpp.io.calculator.aio.client;
import java.util.Scanner;
public class Client {
?? ?private static String DEFAULT_HOST = "127.0.0.1";
?? ?private static int DEFAULT_PORT = 12345;
?? ?private static AsyncClientHandler clientHandle;
?? ?public static void start(){
?? ??? ?start(DEFAULT_HOST,DEFAULT_PORT);
?? ?}
?? ?public static synchronized void start(String ip,int port){
?? ??? ?if(clientHandle!=null)
?? ??? ??? ?return;
?? ??? ?clientHandle = new AsyncClientHandler(ip,port);
?? ??? ?new Thread(clientHandle,"Client").start();
?? ?}
?? ?//向服務(wù)器發(fā)送消息
?? ?public static boolean sendMsg(String msg) throws Exception{
?? ??? ?if(msg.equals("q")) return false;
?? ??? ?clientHandle.sendMsg(msg);
?? ??? ?return true;
?? ?}
?? ?@SuppressWarnings("resource")
?? ?public static void main(String[] args) throws Exception{
?? ??? ?Client.start();
?? ??? ?System.out.println("請輸入請求消息:");
?? ??? ?Scanner scanner = new Scanner(System.in);
?? ??? ?while(Client.sendMsg(scanner.nextLine()));
?? ?}
}
????AsyncClientHandler:
package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
?? ?private AsynchronousSocketChannel clientChannel;
?? ?private String host;
?? ?private int port;
?? ?private CountDownLatch latch;
?? ?public AsyncClientHandler(String host, int port) {
?? ??? ?this.host = host;
?? ??? ?this.port = port;
?? ??? ?try {
?? ??? ??? ?//創(chuàng)建異步的客戶端通道
?? ??? ??? ?clientChannel = AsynchronousSocketChannel.open();
?? ??? ?} catch (IOException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?? ?@Override
?? ?public void run() {
?? ??? ?//創(chuàng)建CountDownLatch等待
?? ??? ?latch = new CountDownLatch(1);
?? ??? ?//發(fā)起異步連接操作,回調(diào)參數(shù)就是這個類本身,如果連接成功會回調(diào)completed方法
?? ??? ?clientChannel.connect(new InetSocketAddress(host, port), this, this);
?? ??? ?try {
?? ??? ??? ?latch.await();
?? ??? ?} catch (InterruptedException e1) {
?? ??? ??? ?e1.printStackTrace();
?? ??? ?}
?? ??? ?try {
?? ??? ??? ?clientChannel.close();
?? ??? ?} catch (IOException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?? ?//連接服務(wù)器成功
?? ?//意味著TCP三次握手完成
?? ?@Override
?? ?public void completed(Void result, AsyncClientHandler attachment) {
?? ??? ?System.out.println("客戶端成功連接到服務(wù)器...");
?? ?}
?? ?//連接服務(wù)器失敗
?? ?@Override
?? ?public void failed(Throwable exc, AsyncClientHandler attachment) {
?? ??? ?System.err.println("連接服務(wù)器失敗...");
?? ??? ?exc.printStackTrace();
?? ??? ?try {
?? ??? ??? ?clientChannel.close();
?? ??? ??? ?latch.countDown();
?? ??? ?} catch (IOException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?? ?//向服務(wù)器發(fā)送消息
?? ?public void sendMsg(String msg){
?? ??? ?byte[] req = msg.getBytes();
?? ??? ?ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
?? ??? ?writeBuffer.put(req);
?? ??? ?writeBuffer.flip();
?? ??? ?//異步寫
?? ??? ?clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
?? ?}
}
????WriteHandler:
package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
?? ?private AsynchronousSocketChannel clientChannel;
?? ?private CountDownLatch latch;
?? ?public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
?? ??? ?this.clientChannel = clientChannel;
?? ??? ?this.latch = latch;
?? ?}
?? ?@Override
?? ?public void completed(Integer result, ByteBuffer buffer) {
?? ??? ?//完成全部數(shù)據(jù)的寫入
?? ??? ?if (buffer.hasRemaining()) {
?? ??? ??? ?clientChannel.write(buffer, buffer, this);
?? ??? ?}
?? ??? ?else {
?? ??? ??? ?//讀取數(shù)據(jù)
?? ??? ??? ?ByteBuffer readBuffer = ByteBuffer.allocate(1024);
?? ??? ??? ?clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
?? ??? ?}
?? ?}
?? ?@Override
?? ?public void failed(Throwable exc, ByteBuffer attachment) {
?? ??? ?System.err.println("數(shù)據(jù)發(fā)送失敗...");
?? ??? ?try {
?? ??? ??? ?clientChannel.close();
?? ??? ??? ?latch.countDown();
?? ??? ?} catch (IOException e) {
?? ??? ?}
?? ?}
}
????ReadHandler:
package com.anxpp.io.calculator.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
?? ?private AsynchronousSocketChannel clientChannel;
?? ?private CountDownLatch latch;
?? ?public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
?? ??? ?this.clientChannel = clientChannel;
?? ??? ?this.latch = latch;
?? ?}
?? ?@Override
?? ?public void completed(Integer result,ByteBuffer buffer) {
?? ??? ?buffer.flip();
?? ??? ?byte[] bytes = new byte[buffer.remaining()];
?? ??? ?buffer.get(bytes);
?? ??? ?String body;
?? ??? ?try {
?? ??? ??? ?body = new String(bytes,"UTF-8");
?? ??? ??? ?System.out.println("客戶端收到結(jié)果:"+ body);
?? ??? ?} catch (UnsupportedEncodingException e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?? ?@Override
?? ?public void failed(Throwable exc,ByteBuffer attachment) {
?? ??? ?System.err.println("數(shù)據(jù)讀取失敗...");
?? ??? ?try {
?? ??? ??? ?clientChannel.close();
?? ??? ??? ?latch.countDown();
?? ??? ?} catch (IOException e) {
?? ??? ?}
?? ?}
}
????這個API使用起來真的是很順手。
????3.3、測試
????Test:
package com.anxpp.io.calculator.aio;
import java.util.Scanner;
import com.anxpp.io.calculator.aio.client.Client;
import com.anxpp.io.calculator.aio.server.Server;
/**
?* 測試方法
?* @author yangtao__anxpp.com
?* @version 1.0
?*/
public class Test {
?? ?//測試主方法
?? ?@SuppressWarnings("resource")
?? ?public static void main(String[] args) throws Exception{
?? ??? ?//運(yùn)行服務(wù)器
?? ??? ?Server.start();
?? ??? ?//避免客戶端先于服務(wù)器啟動前執(zhí)行代碼
?? ??? ?Thread.sleep(100);
?? ??? ?//運(yùn)行客戶端?
?? ??? ?Client.start();
?? ??? ?System.out.println("請輸入請求消息:");
?? ??? ?Scanner scanner = new Scanner(System.in);
?? ??? ?while(Client.sendMsg(scanner.nextLine()));
?? ?}
}
????我們可以在控制臺輸入我們需要計算的算數(shù)字符串,服務(wù)器就會返回結(jié)果,當(dāng)然,我們也可以運(yùn)行大量的客戶端,都是沒有問題的,以為此處設(shè)計為單例客戶端,所以也就沒有演示大量客戶端并發(fā)。
????讀者可以自己修改Client類,然后開辟大量線程,并使用構(gòu)造方法創(chuàng)建很多的客戶端測試。
????下面是其中一次參數(shù)的輸出:
服務(wù)器已啟動,端口號:12345
請輸入請求消息:
客戶端成功連接到服務(wù)器...
連接的客戶端數(shù):1
123456+789+456
服務(wù)器收到消息: 123456+789+456
客戶端收到結(jié)果:124701
9526*56
服務(wù)器收到消息: 9526*56
客戶端收到結(jié)果:533456
...
????AIO是真正的異步非阻塞的,所以,在面對超級大量的客戶端,更能得心應(yīng)手。
????下面就比較一下,幾種I/O編程的優(yōu)缺點(diǎn)。
4、各種I/O的對比
????先以一張表來直觀的對比一下:
????
????具體選擇什么樣的模型或者NIO框架,完全基于業(yè)務(wù)的實(shí)際應(yīng)用場景和性能需求,如果客戶端很少,服務(wù)器負(fù)荷不重,就沒有必要選擇開發(fā)起來相對不那么簡單的NIO做服務(wù)端;相反,就應(yīng)考慮使用NIO或者相關(guān)的框架了。
5、附錄
????上文中服務(wù)端使用到的用于計算的工具類:
package com.anxpp.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
public final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException{
return jse.eval(expression);
}
}
????更多文章:
????Java NIO框架Netty簡單使用
????后續(xù)會寫一篇NIO框架Netty的教程,不過這段時間有一點(diǎn)小忙。
參考書籍: ???
????李林峰-《Netty 權(quán)威指南》
---------------------?
作者:anxpp?
來源:CSDN?
原文:https://blog.csdn.net/anxpp/article/details/51512200?
版權(quán)聲明:本文為博主原創(chuàng)文章,轉(zhuǎn)載請附上博文鏈接!
總結(jié)
以上是生活随笔為你收集整理的Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java学习之javassist
- 下一篇: apache commons常用工具类