日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

高性能IO -Reactor模式的实现

發布時間:2025/7/25 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 高性能IO -Reactor模式的实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

在了解Reactor模式之前, 首先了解什么是NIO.

java.nio全稱java non-blocking IO 即非阻塞IO.這個地方要明白,非阻塞不等于異步。

非阻塞:當一個線上讀取數據,沒有數據時該線程可以干其他的事情。就是調用了之后立馬返回。

異步IO: 在一個IO操作中,用戶態的線程完全不用考慮數據的讀取過程,都交給操作系統完成,完成之后通知用戶線程即可。這才是真正的異步操作。

同步IO? 每個請求必須逐個的被處理,一個流程的處理會導致整個流程的暫時等待。

阻塞:? 某個請求發出后,該請求操作需要的條件不滿足,請求會一直阻塞,不會返回,直到條件滿足。

?

?其中java NIO 中的 Select 在Linux中基于epoll實現。基于IO多路復用。就是一個線程來管理多個IO.

epoll全稱eventpoll 是linux內核針對IO多路復用的實現。在linux中,和epoll類似的由select和poll。

其中epoll監聽的fd集合是一直在內核存在的,有三個系統調用:epoll_create epoll_wait epoll_ctl 通過epoll_wait可以多次監聽同一個fd結合,只返回可讀寫的那部分。

select只有一個系統調用,就是每次都需要將要監聽的所有集合都傳給操作系統,當有事件發生時。操作系統在返回給你整個集合。

?

NIO核心包含三個部分: Channels Buffers Selectors.

Channel: 在NIO中,所有的IO過程都是從建立一個Channel開始的,數據可以從channel中讀取到Buffer中 也可以從Buffer中寫入到channel中。channel就好像BIO中的流。但是channel時雙向的,我感覺這樣更貼近于現實,畢竟TCP連接是全雙工的。

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

channel分為這四種,分別對應著文件,UDP TCP網絡IO.

Buffer buffer即為緩沖區,也就是數據塊。

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

java基礎數據類型中除了boolean都有對應的buffer實現。

Selector (選擇器): 他是NIO中的關鍵所在,我們在程序中可以通過它來實現一個線程同時處理多個Channel 也就是多個連接。

如上圖,一個Selectot監聽五個通道,在使用時首先需要將通道以及對應感興趣的事件(Accept? ?read? writer等?)注冊到Selector上 。當發生對應的事件時,操作系統回通知我們的程序。在Selector中可以讀取到對應的Channel 根據事件類型做出相應的操作。

零拷貝

java NIO中提供的FileChannel擁有transferTo和transferFrom兩個方法,可以直接把FileChannel中的數據拷貝到另一個Channel,或者把另一個Channel中的數據拷貝到FileChannel .在操作系統的支持下,通過這個方法傳輸數據不需要將原數據從內核態拷貝到用戶態,再從用戶態拷貝到內核態。

?

Reactor實現一個簡單的Echo服務器? 基于單個線程同時處理多個連接。這樣一個Selector同時完成Accept? Read Write事件的監聽,同時業邏輯也和Selector在同一個線程中執行。這里可以優化一下將業務邏輯在新的線程中執行。

public class EchoService {private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector, SelectionKey.OP_ACCEPT);while (true){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iteratorKey=keys.iterator();while (iteratorKey.hasNext()){SelectionKey key=iteratorKey.next();if (key.isAcceptable()){ServerSocketChannel serverChannel= (ServerSocketChannel) key.channel();SocketChannel socketChannel=serverChannel.accept();socketChannel.configureBlocking(false).register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,ByteBuffer.allocate(1024));}if (key.isReadable()){SocketChannel sc= (SocketChannel) key.channel();ByteBuffer buffer= (ByteBuffer) key.attachment();buffer.clear();int readCount= sc.read(buffer);if (readCount<0){iteratorKey.remove();continue;}buffer.flip();sc.write(buffer);System.out.print(new String(buffer.array(),0,readCount));}iteratorKey.remove();}}} catch (Exception e) {e.printStackTrace();}finally {System.out.println("exit");}} }

現在計算機的核數越來越多,僅僅用一個核心來處理IO連接有點讓費系統資源,因此我們可以多見幾個Reactor? .其中住Reactor負責TCP的連接(Accept),連接之后分配到子Reactor來處理IO的讀寫事件。

并且每個子Reactor分別屬于一個獨立的線程,每個成功連接后的Channel的所有操作自始至終旨在一個線程處理。這樣保證了同一個請求的所有狀態和上下文在同一個線程中,方便監控請求相應狀態。

具體代碼實現 EchoService為例:

https://github.com/WJ1020/reactor

public class EchoService {private static final Logger logger= LoggerFactory.getLogger(EchoService.class);private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){logger.info("echo service start......");try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector,SelectionKey.OP_ACCEPT);int coreNum = Runtime.getRuntime().availableProcessors();Processor[] processors = new Processor[coreNum];for (int i = 0; i < processors.length; i++) {logger.info("creat processor :{}",i+1);processors[i] = new Processor();}int index=0;while (Status.running){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey selectionKey=iterator.next();iterator.remove();if (selectionKey.isAcceptable()){ServerSocketChannel currServerSocketChannel= (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel=currServerSocketChannel.accept();socketChannel.configureBlocking(false);logger.info("Accept request from {}",socketChannel.getRemoteAddress());Processor processor=processors[(++index)%coreNum];processor.addChannel(socketChannel);}}}} catch (IOException e) {logger.error("io exception {}",e.getMessage());}}} public class Processor {private static final Logger logger= LoggerFactory.getLogger(Processor.class);private static final ExecutorService service=Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());private final Selector selector;private volatile boolean running=true;public Processor() throws IOException {this.selector= SelectorProvider.provider().openSelector();}public void addChannel(SocketChannel socketChannel){try {socketChannel.register(this.selector, SelectionKey.OP_READ);if (running){running=false;start();}wakeup();} catch (ClosedChannelException e) {logger.error("register channel error :{}",e.getMessage());}}private void wakeup(){this.selector.wakeup();}private void start(){service.submit(new ProcessorTask(selector));} } public class ProcessorTask implements Runnable {private final static Logger logger= LoggerFactory.getLogger(ProcessorTask.class);private Selector selector;ProcessorTask(Selector selector) {this.selector = selector;}@Overridepublic void run() {logger.info("{}\tsub reactor start listener",Thread.currentThread().getName());while (Status.running){try {selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey key=iterator.next();iterator.remove();if (key.isReadable()){ByteBuffer buffer= ByteBuffer.allocate(1024);SocketChannel socketChannel= (SocketChannel) key.channel();int count=socketChannel.read(buffer);if (count<0){socketChannel.close();key.cancel();logger.info("{}\t Read ended",socketChannel);}else if (count==0){logger.info("{}\t Message size is 0",socketChannel);}else {buffer.flip();socketChannel.write(buffer);logger.info("{}\t Read message{}",socketChannel,new String(buffer.array()));}}}} catch (IOException e) {logger.error("select error :{}",e.getMessage());}}} }

在EchoService中 ,主Reactor接受到新的連接后,將channel注冊到subReactor的Selector中。每個子Reactor都有一個自己的Selector對象,并有獨立的一個線程處理。

轉載于:https://my.oschina.net/wang520/blog/3036562

總結

以上是生活随笔為你收集整理的高性能IO -Reactor模式的实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。