日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

阻塞式和非阻塞式udp传输_NIO非阻塞网络编程三大核心理念

發布時間:2025/3/12 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 阻塞式和非阻塞式udp传输_NIO非阻塞网络编程三大核心理念 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本次開始NIO網絡編程,之前已經說過BIO,對于阻塞IO里面的問題一定有了清晰的認識,在JDK1.4版本后,提供了新的JAVA IO操作非阻塞API,用意替換JAVA IO 和JAVA NetWorking相關的API。NIO其實有個名稱叫new IO。

(一)NIO

  • ① 介紹

java.nio全稱java non-blocking IO(實際上是 new io),是指JDK 1.4 及以上版本里提供的新api(New IO) ,為所有的原始類型(boolean類型除外)提供緩存支持的數據容器,使用它可以提供非阻塞式的高伸縮性網絡。
HTTP2.0使用了多路復用的技術,做到同一個連接并發處理多個請求,而且并發請求的數量比HTTP1.1大了好幾個數量級。

  • ② 三大核心組件

高性能網絡編程的基礎組件,Buffer緩存區、Channel 通道、Selector 選擇器。

(二) Buffer緩存區

  • ① 介紹

緩存區本質上是一個可以寫入數據的內存塊(類似數組),然后可以再次讀取。此內存塊包含在NIO Buffer 對象中,該對象提供了一組方法,可以更輕松地使用內存塊。
相比較直接對數組的操作。Buffer API 更加容易操作和管理。

  • ② 使用Buffer進行數據寫入與讀取,需要進行如下四個步驟

  • 將數據寫入緩沖區。

  • 調用buffer.flip(),轉換為讀取模式。

  • 緩沖區讀取數據。

  • 調用buffer.clear() 或 buffer.compact() 消除緩沖區

    • ③ Buffer工作原理

    BUffer三個重要屬性,通過完成了數組的封裝。

    1.capacity 容量:作為一個內存塊,Buffer具有一定的固定大小,也稱為【容量】。
    2.position 位置:寫入模式時代表寫數據的位置。讀取模式時代表讀取數據的位置。
    3.limit 限制:寫入模式,限制等于buffer的容量,讀取模式下,limit等于寫入的數據量。

    • ④ 源碼

    import java.nio.ByteBuffer;
    import java.nio.IntBuffer;
    import java.nio.LongBuffer;

    public class BufferDemo {
    public static void main(String[] args) {
    // 構建一個byte字節緩沖區,容量是4
    //堆內存
    ByteBuffer byteBuffer = ByteBuffer.allocate(4);

    //堆外內存
    // ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4);
    // 默認寫入模式,查看三個重要的指標
    System.out.println(String.format("初始化:capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
    byteBuffer.position(), byteBuffer.limit()));
    // 寫入2字節的數據
    byteBuffer.put((byte) 1);
    byteBuffer.put((byte) 2);
    byteBuffer.put((byte) 3);
    // 再看數據
    System.out.println(String.format("寫入3字節后,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
    byteBuffer.position(), byteBuffer.limit()));

    // 轉換為讀取模式(不調用flip方法,也是可以讀取數據的,但是position記錄讀取的位置不對)
    System.out.println("#######開始讀取");
    byteBuffer.flip();
    byte a = byteBuffer.get();
    System.out.println(a);
    byte b = byteBuffer.get();
    System.out.println(b);
    System.out.println(String.format("讀取2字節數據后,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
    byteBuffer.position(), byteBuffer.limit()));

    // 繼續寫入3字節,此時讀模式下,limit=3,position=2.繼續寫入只能覆蓋寫入一條數據
    // clear()方法清除整個緩沖區。compact()方法僅清除已閱讀的數據。轉為寫入模式
    byteBuffer.compact(); // buffer : 1 , 3
    byteBuffer.put((byte) 3);
    byteBuffer.put((byte) 4);
    byteBuffer.put((byte) 5);
    System.out.println(String.format("最終的情況,capacity容量:%s, position位置:%s, limit限制:%s", byteBuffer.capacity(),
    byteBuffer.position(), byteBuffer.limit()));

    // rewind() 重置position為0
    // mark() 標記position的位置
    // reset() 重置position為上次mark()標記的位置

    }
    }

    • ⑤ ByteBuffer 內存類型

    ByteBuffer 為性能關鍵型代碼提供了直接內存(direct堆外)和非直接內存(heap堆)兩種實現,堆外內存獲取的方式

    ByteBuffer directBytebuffer = ByteBuffer.allocateDirect(noBytes);

    好處

  • 進行網絡IO 或者 文件IO時比heapBuffer 少一次拷貝,(file/socket —— OS memory —— jvm heap )GC會移動對象內存,在寫file 或 socket的過程中,JVM的實現中,會先把數據復制到堆外,在進行寫入。

  • GC范圍之外,降低GC壓力,但實現了自動管理。DirectByteBuffer 中 有一個Cleaner 對象(PhantomReference) ,Cleaner被GC前會執行clean 方法,觸發DirectByteBuffer 中定義Deallocator

  • 建議

  • 性能確實可觀的時候才去使用,分配給大型,長壽命(網絡傳輸,文件讀寫場景)

  • 通過虛擬機參數MaxDirectMemorySize限制大小,防止耗盡整個機器的內存,在JVM之外的內存無法監控。

  • (三)Channel 通道

    • ① 介紹

    Channel的API 涵蓋了UDP、TCP網絡和文件IO,FileChannel,DatagramChannel,SocketChannel,ServerSocketChannel。

    • ② 和標準IO Stream操作的區別

    在一個通道內進行讀取和寫入stream通常是單向的(input 或 output),可以非堵塞讀取和寫入通道,通道中讀取或寫入緩沖區。

    • ③ SocketChannel

    SocketChannel用于建立TCP網絡連接,類似java.net.Socket。有兩種創建socketChannel形式

    1.客戶端主動發起和服務器的連接
    2.服務器獲取的新連接

    write寫

    在尚未寫入任何內容時可能就返回了。需要在循環中調用write()

    read讀

    read() 方法可能直接返回而根本不讀取任何數據,根據返回的int值判斷讀取了多少字節。

    • ④ ServerSocketChannel

    ServerSocketChannel 可能監聽新建立的TCP連接通道,類似ServerSocket。

    ServerSocketChannel.accepta()

    如果該通道處于飛度賽模式,那么如何沒有掛起的連接,該方法將立即返回null。必須檢查返回的SocketChannel是否為null。

    • ⑤ 源碼

    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;

    public class NIOClient {

    public static void main(String[] args) throws Exception {
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
    while (!socketChannel.finishConnect()) {
    // 沒連接上,則一直等待
    Thread.yield();
    }
    Scanner scanner = new Scanner(System.in);
    System.out.println("請輸入:");
    // 發送內容
    String msg = scanner.nextLine();
    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
    while (buffer.hasRemaining()) {
    socketChannel.write(buffer);
    }
    // 讀取響應
    System.out.println("收到服務端響應:");
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);

    while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
    // 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)
    if (requestBuffer.position() > 0) break;
    }
    requestBuffer.flip();
    byte[] content = new byte[requestBuffer.limit()];
    requestBuffer.get(content);
    System.out.println(new String(content));
    scanner.close();
    socketChannel.close();
    }

    }

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.ArrayList;
    import java.util.Iterator;

    /**
    * 直接基于非阻塞的寫法,一個線程處理輪詢所有請求
    */
    public class NIOServer1 {
    /**
    * 已經建立連接的集合
    */
    private static ArrayList channels = new ArrayList<>();public static void main(String[] args) throws Exception {// 創建網絡服務端
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false); // 設置為非阻塞模式
    serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 綁定端口
    System.out.println("啟動成功");while (true) {
    SocketChannel socketChannel = serverSocketChannel.accept(); // 獲取新tcp連接通道// tcp請求 讀取/響應if (socketChannel != null) {
    System.out.println("收到新連接 : " + socketChannel.getRemoteAddress());
    socketChannel.configureBlocking(false); // 默認是阻塞的,一定要設置為非阻塞
    channels.add(socketChannel);
    } else {// 沒有新連接的情況下,就去處理現有連接的數據,處理完的就刪除掉
    Iterator iterator = channels.iterator();while (iterator.hasNext()) {
    SocketChannel ch = iterator.next();try {
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);if (ch.read(requestBuffer) == 0) {// 等于0,代表這個通道沒有數據需要處理,那就待會再處理continue;
    }while (ch.isOpen() && ch.read(requestBuffer) != -1) {// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)if (requestBuffer.position() > 0) break;
    }if(requestBuffer.position() == 0) continue; // 如果沒數據了, 則不繼續后面的處理
    requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];
    requestBuffer.get(content);
    System.out.println(new String(content));
    System.out.println("收到數據,來自:" + ch.getRemoteAddress());// 響應結果 200
    String response = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";
    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {
    ch.write(buffer);
    }
    iterator.remove();
    } catch (IOException e) {
    e.printStackTrace();
    iterator.remove();
    }
    }
    }
    }// 用到了非阻塞的API, 再設計上,和BIO可以有很大的不同// 問題: 輪詢通道的方式,低效,浪費CPU
    }
    }

    (四)Select選擇器

    • ① 介紹

    Selector 是一個Java NIO 組件,可以檢查一個或多個NIO通道,并確定哪些通道已準備好進行讀取或寫入,實現單個線程可以管理多個通道,從而管理或多個網絡連接。

    • ② selector 監聽多個 channel的不同事件

  • Connect 連接(SelectionKey.OP_CONNECT)

  • Accept 準備就緒(OP_ACCEPT)

  • Read 讀取(OP_READ)

  • Write 寫入(OP_WRITE)

    • ③ selector 選擇器

    一個線程處理多個通道的核心概念理解:事件驅動機制。
    非堵塞的網絡通道下,開發者通過Selector注冊對于通道感興趣的事件類型,線程通過監聽事件來觸發響應的代碼執行(最底層hi操作系統的多路復用機制)

    • ④ 源碼

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    /**
    * 結合Selector實現的非阻塞服務端(放棄對channel的輪詢,借助消息通知機制)
    */
    public class NIOServerV2 {

    public static void main(String[] args) throws Exception {
    // 1. 創建網絡服務端ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false); // 設置為非阻塞模式

    // 2. 構建一個Selector選擇器,并且將channel注冊上去
    Selector selector = Selector.open();
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);// 將serverSocketChannel注冊到selector
    selectionKey.interestOps(SelectionKey.OP_ACCEPT); // 對serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)

    // 3. 綁定端口
    serverSocketChannel.socket().bind(new InetSocketAddress(8080));

    System.out.println("啟動成功");

    while (true) {
    // 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有返回
    selector.select();
    // 獲取事件
    Set selectionKeys = selector.selectedKeys();// 遍歷查詢結果e
    Iterator iter = selectionKeys.iterator();while (iter.hasNext()) {// 被封裝的查詢結果
    SelectionKey key = iter.next();
    iter.remove();// 關注 Read 和 Accept兩個事件if (key.isAcceptable()) {
    ServerSocketChannel server = (ServerSocketChannel) key.attachment();// 將拿到的客戶端連接通道,注冊到selector上面
    SocketChannel clientSocketChannel = server.accept(); // mainReactor 輪詢accept
    clientSocketChannel.configureBlocking(false);
    clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
    System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
    }if (key.isReadable()) {
    SocketChannel socketChannel = (SocketChannel) key.attachment();try {
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)if (requestBuffer.position() > 0) break;
    }if(requestBuffer.position() == 0) continue; // 如果沒數據了, 則不繼續后面的處理
    requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];
    requestBuffer.get(content);
    System.out.println(new String(content));
    System.out.println("收到數據,來自:" + socketChannel.getRemoteAddress());// TODO 業務操作 數據庫 接口調用等等// 響應結果 200
    String response = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";
    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {
    socketChannel.write(buffer);
    }
    } catch (IOException e) {// e.printStackTrace();
    key.cancel(); // 取消事件訂閱
    }
    }
    }
    selector.selectNow();
    }// 問題: 此處一個selector監聽所有事件,一個線程處理所有請求事件. 會成為瓶頸! 要有多線程的運用
    }
    }

    • ⑤ NIO 和 BIO 的區別

    • ⑥ NIO Reactor的方式

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Random;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
    * NIO selector 多路復用reactor線程模型
    */
    public class NIOServerV3 {
    /** 處理業務操作的線程 */
    private static ExecutorService workPool = Executors.newCachedThreadPool();

    /**
    * 封裝了selector.select()等事件輪詢的代碼
    */
    abstract class ReactorThread extends Thread {

    Selector selector;
    LinkedBlockingQueue taskQueue = new LinkedBlockingQueue<>();/**
    * Selector監聽到有事件后,調用這個方法
    */public abstract void handler(SelectableChannel channel) throws Exception;private ReactorThread() throws IOException {
    selector = Selector.open();
    }volatile boolean running = false;@Overridepublic void run() {// 輪詢Selector事件while (running) {try {// 執行隊列中的任務
    Runnable task;while ((task = taskQueue.poll()) != null) {
    task.run();
    }
    selector.select(1000);// 獲取查詢結果
    Set selected = selector.selectedKeys();// 遍歷查詢結果
    Iterator iter = selected.iterator();while (iter.hasNext()) {// 被封裝的查詢結果
    SelectionKey key = iter.next();
    iter.remove();int readyOps = key.readyOps();// 關注 Read 和 Accept兩個事件if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {try {
    SelectableChannel channel = (SelectableChannel) key.attachment();
    channel.configureBlocking(false);
    handler(channel);if (!channel.isOpen()) {
    key.cancel(); // 如果關閉了,就取消這個KEY的訂閱
    }
    } catch (Exception ex) {
    key.cancel(); // 如果有異常,就取消這個KEY的訂閱
    }
    }
    }
    selector.selectNow();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }private SelectionKey register(SelectableChannel channel) throws Exception {// 為什么register要以任務提交的形式,讓reactor線程去處理?// 因為線程在執行channel注冊到selector的過程中,會和調用selector.select()方法的線程爭用同一把鎖// 而select()方法實在eventLoop中通過while循環調用的,爭搶的可能性很高,為了讓register能更快的執行,就放到同一個線程來處理
    FutureTask futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
    taskQueue.add(futureTask);return futureTask.get();
    }private void doStart() {if (!running) {
    running = true;
    start();
    }
    }
    }private ServerSocketChannel serverSocketChannel;// 1、創建多個線程 - accept處理reactor線程 (accept線程)private ReactorThread[] mainReactorThreads = new ReactorThread[1];// 2、創建多個線程 - io處理reactor線程 (I/O線程)private ReactorThread[] subReactorThreads = new ReactorThread[8];/**
    * 初始化線程組
    */private void newGroup() throws IOException {// 創建IO線程,負責處理客戶端連接以后socketChannel的IO讀寫for (int i = 0; i < subReactorThreads.length; i++) {
    subReactorThreads[i] = new ReactorThread() {@Overridepublic void handler(SelectableChannel channel) throws IOException {// work線程只負責處理IO處理,不處理accept事件
    SocketChannel ch = (SocketChannel) channel;
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024);while (ch.isOpen() && ch.read(requestBuffer) != -1) {// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)if (requestBuffer.position() > 0) break;
    }if (requestBuffer.position() == 0) return; // 如果沒數據了, 則不繼續后面的處理
    requestBuffer.flip();byte[] content = new byte[requestBuffer.limit()];
    requestBuffer.get(content);
    System.out.println(new String(content));
    System.out.println(Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress());// TODO 業務操作 數據庫、接口...
    workPool.submit(() -> {
    });// 響應結果 200
    String response = "HTTP/1.1 200 OK\r\n" +"Content-Length: 11\r\n\r\n" +"Hello World";
    ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());while (buffer.hasRemaining()) {
    ch.write(buffer);
    }
    }
    };
    }// 創建mainReactor線程, 只負責處理serverSocketChannelfor (int i = 0; i < mainReactorThreads.length; i++) {
    mainReactorThreads[i] = new ReactorThread() {
    AtomicInteger incr = new AtomicInteger(0);@Overridepublic void handler(SelectableChannel channel) throws Exception {// 只做請求分發,不做具體的數據讀取
    ServerSocketChannel ch = (ServerSocketChannel) channel;
    SocketChannel socketChannel = ch.accept();
    socketChannel.configureBlocking(false);// 收到連接建立的通知之后,分發給I/O線程繼續去讀取數據int index = incr.getAndIncrement() % subReactorThreads.length;
    ReactorThread workEventLoop = subReactorThreads[index];
    workEventLoop.doStart();
    SelectionKey selectionKey = workEventLoop.register(socketChannel);
    selectionKey.interestOps(SelectionKey.OP_READ);
    System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
    }
    };
    }
    }/**
    * 初始化channel,并且綁定一個eventLoop線程
    *
    * @throws IOException IO異常
    */private void initAndRegister() throws Exception {// 1、 創建ServerSocketChannel
    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);// 2、 將serverSocketChannel注冊到selectorint index = new Random().nextInt(mainReactorThreads.length);
    mainReactorThreads[index].doStart();
    SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    }/**
    * 綁定端口
    *
    * @throws IOException IO異常
    */private void bind() throws IOException {// 1、 正式綁定端口,對外服務
    serverSocketChannel.bind(new InetSocketAddress(8080));
    System.out.println("啟動完成,端口8080");
    }public static void main(String[] args) throws Exception {
    NIOServerV3 nioServerV3 = new NIOServerV3();
    nioServerV3.newGroup(); // 1、 創建main和sub兩組線程
    nioServerV3.initAndRegister(); // 2、 創建serverSocketChannel,注冊到mainReactor線程上的selector上
    nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
    }
    }

    PS:NIO為開發者提供了功能豐富及強大的IO處理API,但是在應用開發的過程中,直接使用JDK提供的API,比較繁瑣,而且要想將性能進行提升,光有NIO還是不夠的,還需要將多線程技術與之結合起來。因為網絡編程本身的復雜性,以及JDK API開發的使用難度較高,所以開源社區中,涌出來很多的JDK NIO進行封裝了,增強后的網絡編程框架,例如:Netty、Mina等。

    總結

    以上是生活随笔為你收集整理的阻塞式和非阻塞式udp传输_NIO非阻塞网络编程三大核心理念的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。