Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式
本文介紹了Java中的四種I/O模型,同步阻塞,同步非阻塞,多路復用,異步阻塞。同時將NIO和BIO進行了對比,并詳細分析了基于NIO的Reactor模式,包括經典單線程模型以及多線程模式和多Reactor模式。
原創文章,轉載請務必將下面這段話置于文章開頭處(保留超鏈接)。
本文轉發自技術世界,原文鏈接 http://www.jasongj.com/java/nio_reactor/
Java I/O模型
同步 vs. 異步
同步I/O 每個請求必須逐個地被處理,一個請求的處理會導致整個流程的暫時等待,這些事件無法并發地執行。用戶線程發起I/O請求后需要等待或者輪詢內核I/O操作完成后才能繼續執行。
異步I/O 多個請求可以并發地執行,一個請求或者任務的執行不會導致整個流程的暫時等待。用戶線程發起I/O請求后仍然繼續執行,當內核I/O操作完成后會通知用戶線程,或者調用用戶線程注冊的回調函數。
阻塞 vs. 非阻塞
阻塞 某個請求發出后,由于該請求操作需要的條件不滿足,請求操作一直阻塞,不會返回,直到條件滿足。
非阻塞 請求發出后,若該請求需要的條件不滿足,則立即返回一個標志信息告知條件不滿足,而不會一直等待。一般需要通過循環判斷請求條件是否滿足來獲取請求結果。
需要注意的是,阻塞并不等價于同步,而非阻塞并非等價于異步。事實上這兩組概念描述的是I/O模型中的兩個不同維度。
同步和異步著重點在于多個任務執行過程中,后發起的任務是否必須等先發起的任務完成之后再進行。而不管先發起的任務請求是阻塞等待完成,還是立即返回通過循環等待請求成功。
而阻塞和非阻塞重點在于請求的方法是否立即返回(或者說是否在條件不滿足時被阻塞)。
Unix下五種I/O模型
Unix 下共有五種 I/O 模型:
- 阻塞 I/O
- 非阻塞 I/O
- I/O 多路復用(select和poll)
- 信號驅動 I/O(SIGIO)
- 異步 I/O(Posix.1的aio_系列函數)
阻塞I/O
如上文所述,阻塞I/O下請求無法立即完成則保持阻塞。阻塞I/O分為如下兩個階段。
- 階段1:等待數據就緒。網絡 I/O 的情況就是等待遠端數據陸續抵達;磁盤I/O的情況就是等待磁盤數據從磁盤上讀取到內核態內存中。
- 階段2:數據拷貝。出于系統安全,用戶態的程序沒有權限直接讀取內核態內存,因此內核負責把內核態內存中的數據拷貝一份到用戶態內存中。
非阻塞I/O
非阻塞I/O請求包含如下三個階段
- socket設置為 NONBLOCK(非阻塞)就是告訴內核,當所請求的I/O操作無法完成時,不要將線程睡眠,而是返回一個錯誤碼(EWOULDBLOCK) ,這樣請求就不會阻塞。
- I/O操作函數將不斷的測試數據是否已經準備好,如果沒有準備好,繼續測試,直到數據準備好為止。整個I/O 請求的過程中,雖然用戶線程每次發起I/O請求后可以立即返回,但是為了等到數據,仍需要不斷地輪詢、重復請求,消耗了大量的 CPU 的資源。
- 數據準備好了,從內核拷貝到用戶空間。
一般很少直接使用這種模型,而是在其他I/O模型中使用非阻塞I/O 這一特性。這種方式對單個I/O 請求意義不大,但給I/O多路復用提供了條件。
I/O多路復用(異步阻塞 I/O)
I/O多路復用會用到select或者poll函數,這兩個函數也會使線程阻塞,但是和阻塞I/O所不同的是,這兩個函數可以同時阻塞多個I/O操作。而且可以同時對多個讀操作,多個寫操作的I/O函數進行檢測,直到有數據可讀或可寫時,才真正調用I/O操作函數。
從流程上來看,使用select函數進行I/O請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視Channel,以及調用select函數的額外操作,增加了額外工作。但是,使用 select以后最大的優勢是用戶可以在一個線程內同時處理多個Channel的I/O請求。用戶可以注冊多個Channel,然后不斷地調用select讀取被激活的Channel,即可達到在同一個線程內同時處理多個I/O請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達到這個目的。
調用select/poll該方法由一個用戶態線程負責輪詢多個Channel,直到某個階段1的數據就緒,再通知實際的用戶線程執行階段2的拷貝。 通過一個專職的用戶態線程執行非阻塞I/O輪詢,模擬實現了階段一的異步化。
信號驅動I/O(SIGIO)
首先我們允許socket進行信號驅動I/O,并安裝一個信號處理函數,線程繼續運行并不阻塞。當數據準備好時,線程會收到一個SIGIO 信號,可以在信號處理函數中調用I/O操作函數處理數據。
異步I/O
調用aio_read 函數,告訴內核描述字,緩沖區指針,緩沖區大小,文件偏移以及通知的方式,然后立即返回。當內核將數據拷貝到緩沖區后,再通知應用程序。所以異步I/O模式下,階段1和階段2全部由內核完成,完成不需要用戶線程的參與。
幾種I/O模型對比
除異步I/O外,其它四種模型的階段2基本相同,都是從內核態拷貝數據到用戶態。區別在于階段1不同。前四種都屬于同步I/O。
Java中四種I/O模型
上一章所述Unix中的五種I/O模型,除信號驅動I/O外,Java對其它四種I/O模型都有所支持。其中Java最早提供的blocking I/O即是阻塞I/O,而NIO即是非阻塞I/O,同時通過NIO實現的Reactor模式即是I/O復用模型的實現,通過AIO實現的Proactor模式即是異步I/O模型的實現。
從IO到NIO
面向流 vs. 面向緩沖
Java IO是面向流的,每次從流(InputStream/OutputStream)中讀一個或多個字節,直到讀取完所有字節,它們沒有被緩存在任何地方。另外,它不能前后移動流中的數據,如需前后移動處理,需要先將其緩存至一個緩沖區。
Java NIO面向緩沖,數據會被讀取到一個緩沖區,需要時可以在緩沖區中前后移動處理,這增加了處理過程的靈活性。但與此同時在處理緩沖區前需要檢查該緩沖區中是否包含有所需要處理的數據,并需要確保更多數據讀入緩沖區時,不會覆蓋緩沖區內尚未處理的數據。
阻塞 vs. 非阻塞
Java IO的各種流是阻塞的。當某個線程調用read()或write()方法時,該線程被阻塞,直到有數據被讀取到或者數據完全寫入。阻塞期間該線程無法處理任何其它事情。
Java NIO為非阻塞模式。讀寫請求并不會阻塞當前線程,在數據可讀/寫前當前線程可以繼續做其它事情,所以一個單獨的線程可以管理多個輸入和輸出通道。
選擇器(Selector)
Java NIO的選擇器允許一個單獨的線程同時監視多個通道,可以注冊多個通道到同一個選擇器上,然后使用一個單獨的線程來“選擇”已經就緒的通道。這種“選擇”機制為一個單獨線程管理多個通道提供了可能。
零拷貝
Java NIO中提供的FileChannel擁有transferTo和transferFrom兩個方法,可直接把FileChannel中的數據拷貝到另外一個Channel,或者直接把另外一個Channel中的數據拷貝到FileChannel。該接口常被用于高效的網絡/文件的數據傳輸和大文件拷貝。在操作系統支持的情況下,通過該方法傳輸數據并不需要將源數據從內核態拷貝到用戶態,再從用戶態拷貝到目標通道的內核態,同時也避免了兩次用戶態和內核態間的上下文切換,也即使用了“零拷貝”,所以其性能一般高于Java IO中提供的方法。
使用FileChannel的零拷貝將本地文件內容傳輸到網絡的示例代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
? public class NIOClient { public static void main(String[] args) throws IOException, InterruptedException { SocketChannel socketChannel = SocketChannel.open(); InetSocketAddress address = new InetSocketAddress(1234); socketChannel.connect(address); RandomAccessFile file = new RandomAccessFile( NIOClient.class.getClassLoader().getResource("test.txt").getFile(), "rw"); FileChannel channel = file.getChannel(); channel.transferTo(0, channel.size(), socketChannel); channel.close(); file.close(); socketChannel.close(); } } |
?
阻塞I/O下的服務器實現
單線程逐個處理所有請求
使用阻塞I/O的服務器,一般使用循環,逐個接受連接請求并讀取數據,然后處理下一個請求。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
? public class IOServer { private static final Logger LOGGER = LoggerFactory.getLogger(IOServer.class); public static void main(String[] args) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(inputstream)); IOUtils.closeQuietly(inputstream); } } catch(IOException ex) { IOUtils.closeQuietly(serverSocket); LOGGER.error("Read message failed", ex); } } } |
?
為每個請求創建一個線程
上例使用單線程逐個處理所有請求,同一時間只能處理一個請求,等待I/O的過程浪費大量CPU資源,同時無法充分使用多CPU的優勢。下面是使用多線程對阻塞I/O模型的改進。一個連接建立成功后,創建一個單獨的線程處理其I/O操作。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
? public class IOServerMultiThread { private static final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class); public static void main(String[] args) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); new Thread( () -> { try{ InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(inputstream)); IOUtils.closeQuietly(inputstream); } catch (IOException ex) { LOGGER.error("Read message failed", ex); } }).start(); } } catch(IOException ex) { IOUtils.closeQuietly(serverSocket); LOGGER.error("Accept connection failed", ex); } } } |
?
使用線程池處理請求
為了防止連接請求過多,導致服務器創建的線程數過多,造成過多線程上下文切換的開銷。可以通過線程池來限制創建的線程數,如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
? public class IOServerThreadPool { private static final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); executorService.submit(() -> { try{ InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream))); } catch (IOException ex) { LOGGER.error("Read message failed", ex); } }); } } catch(IOException ex) { try { serverSocket.close(); } catch (IOException e) { } LOGGER.error("Accept connection failed", ex); } } } |
?
Reactor模式
精典Reactor模式
精典的Reactor模式示意圖如下所示。
在Reactor模式中,包含如下角色
- Reactor?將I/O事件發派給對應的Handler
- Acceptor?處理客戶端連接請求
- Handlers?執行非阻塞讀/寫
最簡單的Reactor模式實現代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
? public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = socketChannel.read(buffer); if (count <= 0) { socketChannel.close(); key.cancel(); LOGGER.info("Received invalide data, close the connection"); continue; } LOGGER.info("Received message {}", new String(buffer.array())); } keys.remove(key); } } } } |
?
為了方便閱讀,上示代碼將Reactor模式中的所有角色放在了一個類中。
從上示代碼中可以看到,多個Channel可以注冊到同一個Selector對象上,實現了一個線程同時監控多個請求狀態(Channel)。同時注冊時需要指定它所關注的事件,例如上示代碼中socketServerChannel對象只注冊了OP_ACCEPT事件,而socketChannel對象只注冊了OP_READ事件。
selector.select()是阻塞的,當有至少一個通道可用時該方法返回可用通道個數。同時該方法只捕獲Channel注冊時指定的所關注的事件。
多工作線程Reactor模式
經典Reactor模式中,盡管一個線程可同時監控多個請求(Channel),但是所有讀/寫請求以及對新連接請求的處理都在同一個線程中處理,無法充分利用多CPU的優勢,同時讀/寫操作也會阻塞對新連接請求的處理。因此可以引入多線程,并行處理多個讀/寫操作,如下圖所示。
多線程Reactor模式示例代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
? public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { if(selector.selectNow() < 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); readKey.attach(new Processor()); } else if (key.isReadable()) { Processor processor = (Processor) key.attachment(); processor.process(key); } } } } } |
?
從上示代碼中可以看到,注冊完SocketChannel的OP_READ事件后,可以對相應的SelectionKey attach一個對象(本例中attach了一個Processor對象,該對象處理讀請求),并且在獲取到可讀事件后,可以取出該對象。
注:attach對象及取出該對象是NIO提供的一種操作,但該操作并非Reactor模式的必要操作,本文使用它,只是為了方便演示NIO的接口。
具體的讀請求處理在如下所示的Processor類中。該類中設置了一個靜態的線程池處理所有請求。而process方法并不直接處理I/O請求,而是把該I/O操作提交給上述線程池去處理,這樣就充分利用了多線程的優勢,同時將對新連接的處理和讀/寫操作的處理放在了不同的線程中,讀/寫操作不再阻塞對新連接請求的處理。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
? public class Processor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class); private static final ExecutorService service = Executors.newFixedThreadPool(16); public void process(SelectionKey selectionKey) { service.submit(() -> { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); selectionKey.cancel(); LOGGER.info("{}\t Read ended", socketChannel); return null; } else if(count == 0) { return null; } LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array())); return null; }); } } |
多Reactor
Netty中使用的Reactor模式,引入了多Reactor,也即一個主Reactor負責監控所有的連接請求,多個子Reactor負責監控并處理讀/寫請求,減輕了主Reactor的壓力,降低了主Reactor壓力太大而造成的延遲。
并且每個子Reactor分別屬于一個獨立的線程,每個成功連接后的Channel的所有操作由同一個線程處理。這樣保證了同一請求的所有狀態和上下文在同一個線程中,避免了不必要的上下文切換,同時也方便了監控請求響應狀態。
多Reactor模式示意圖如下所示。
多Reactor示例代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
? public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); int coreNum = Runtime.getRuntime().availableProcessors(); Processor[] processors = new Processor[coreNum]; for (int i = 0; i < processors.length; i++) { processors[i] = new Processor(); } int index = 0; while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { keys.remove(key); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); Processor processor = processors[(int) ((index++) % coreNum)]; processor.addChannel(socketChannel); processor.wakeup(); } } } } } |
?
如上代碼所示,本文設置的子Reactor個數是當前機器可用核數的兩倍(與Netty默認的子Reactor個數一致)。對于每個成功連接的SocketChannel,通過round robin的方式交給不同的子Reactor。
子Reactor對SocketChannel的處理如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
? public class Processor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class); private static final ExecutorService service = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); private Selector selector; public Processor() throws IOException { this.selector = SelectorProvider.provider().openSelector(); start(); } public void addChannel(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(this.selector, SelectionKey.OP_READ); } public void wakeup() { this.selector.wakeup(); } public void start() { service.submit(() -> { while (true) { if (selector.select(500) <= 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); key.cancel(); LOGGER.info("{}\t Read ended", socketChannel); continue; } else if (count == 0) { LOGGER.info("{}\t Message size is 0", socketChannel); continue; } else { LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array())); } } } } }); } } |
?
在Processor中,同樣創建了一個靜態的線程池,且線程池的大小為機器核數的兩倍。每個Processor實例均包含一個Selector實例。同時每次獲取Processor實例時均提交一個任務到該線程池,并且該任務正常情況下一直循環處理,不會停止。而提交給該Processor的SocketChannel通過在其Selector注冊事件,加入到相應的任務中。由此實現了每個子Reactor包含一個Selector對象,并由一個獨立的線程處理。
Java進階系列
- Java進階(一)Annotation(注解)
- Java進階(二)當我們說線程安全時,到底在說什么
- Java進階(三)多線程開發關鍵技術
- Java進階(四)線程間通信方式對比
- Java進階(五)NIO和Reactor模式進階
- Java進階(六)從ConcurrentHashMap的演進看Java多線程核心技術
from:?http://www.jasongj.com/java/nio_reactor/
總結
以上是生活随笔為你收集整理的Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我所理解的Java NIO
- 下一篇: Java NIO之缓冲区