NIO和Reactor
本文參考Doug Lea的Scalable IO in Java.
網絡服務
隨著網絡服務的越來越多,我們對網絡服務的性能有了更高的要求,提供一個高性能,穩定的web服務是一件很麻煩的事情,所以有了netty框架幫我們完成。
我們對各種各樣的網絡服務進行抽象,得到最基本的業務流程:
1:讀取請求信息
2:對請求信息進行解碼
3:進行相關的業務處理
4:對處理結果進行編碼
5:發送請求
看到這,netty的ChannelHandler就是干這幾件事情的。
傳統的網絡服務
在jdk1.4之前,我們只有BIO,所以當時的網絡服務的架構是這樣的。
每個線程處理一個請求, 由于線程個數和cpu個數的原因,這種設計性能是有上限的,所以就有了集群模式:tomcat集群。
/*** Created by gaoxing on 2015-01-20 .*/ public class ClasssicServer {public static void main(String[] args) {try {ServerSocket serverSocket=new ServerSocket(8888,10);System.out.println("server is start");while( ! Thread.interrupted()){new Thread(new Handler(serverSocket.accept())).start();}} catch (IOException e) {e.printStackTrace();}}static class Handler implements Runnable{final Socket socket;final static int MAX_SIZE=1024;public Handler(Socket socket){this.socket=socket;}@Overridepublic void run() {//TODO//在這里對socket中的數據進行讀取和處理,然后把結果寫入到socket中去 }} }高性能的IO目標和Reactor
1:高負載下可以穩定的工作
2:提高資源的利用率
3:低延遲
這有就有了分而治之和事件驅動的思想。這樣沒有thread就不用瞎跑了,cpu就不用不停的切換Thread . 這樣提出了Reactor模式
1:Reactor接收IO事件發送給該事件的處理器處理
2:處理器的操作是非阻塞的。
3:管理事件和處理器的綁定。
?
?
這是一個單線程版本,所有的請求都是一個線程處理,當然Reactor不是無緣無故的提出來的,因為jdk提供了nio包,nio使得Reator得于實現
1:channels: 通道就是數據源的抽象,通過它可以無阻塞的讀取數據
2:buffers? : 用來裝載數據的,可以把從channel讀取到buffer中,或者把buffer中的數據寫入到channel中
3:selector :?用來監聽 有IO事件,并告訴channel
4:selectionkeys:? IO事件和處理器綁定
/*** Created by gaoxing on 2015-01-20 .*/ public class Reactor implements Runnable {final Selector selector ;final ServerSocketChannel serverSocketChannel;public Reactor(int port) throws IOException {this.selector=Selector.open();this.serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));//一定是非阻塞的,阻塞的一個通道就只能處理一個請求了serverSocketChannel.configureBlocking(false);//把OP_ACCEPT事件和Acceptor綁定SelectionKey sk=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);sk.attach(new Acceptor());}@Overridepublic void run() {while(!Thread.interrupted()){try {selector.select();//該方法是阻塞的,只有IO事件來了才向下執行Set<SelectionKey> selected=selector.selectedKeys();Iterator<SelectionKey> it=selected.iterator();while(it.hasNext()){dispatch(it.next());}selected.clear() } catch (IOException e) {e.printStackTrace();}}}private void dispatch(SelectionKey next) {Runnable runnable= (Runnable) next.attachment();if(runnable!=null){runnable.run();}}private class Acceptor implements Runnable{@Overridepublic void run() {SocketChannel channel= null;try {channel = serverSocketChannel.accept();if (channel!=null){new Handler(selector,channel);}} catch (IOException e) {e.printStackTrace();}}} }class Handler implements Runnable {final SelectionKey sk;final SocketChannel channel;final static int MAXSIZE=1024;ByteBuffer input=ByteBuffer.allocate(MAXSIZE);ByteBuffer output=ByteBuffer.allocate(MAXSIZE);static final int READING=0,SENDING=1;int state=READING;public Handler(Selector selector,SocketChannel channel) throws IOException {this.channel=channel;this.channel.configureBlocking(false);/*** 這個有個問題,ppt上register是0,*/sk=this.channel.register(selector,SelectionKey.OP_READ);sk.attach(this);/*** 這里的作用是,設置key的狀態為可讀,然后讓后selector的selector返回。* 然后就可以處理OP_READ事件了*/sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {if (state==READING) read();if (state==SENDING) write();}void read(){state=SENDING;sk.interestOps(SelectionKey.OP_WRITE);}void write(){sk.cancel();} }
?上面代碼注解理解有誤:
sk=this.channel.register(selector,0); 這里是初始化一個SelectionKey 不監聽事件sk.interestOps(SelectionKey.OP_READ); 這里設置監聽的事件為OP_READ
多線程的Reactor模式
現在的CPU多核的,為了提高對硬件的使用效率需要考慮使用多線程的Reactor設計模式,Reactor主要用來處罰事件的,但是事件的處理會降低Reactor的性能,考慮把事件的處理放到別的線程上來做,有點想android的設計模式,UI線程用來接收用戶的事件,事件的處理放到線程去做來提高用戶的體驗。多線程Reactor有兩種一種是Reactor線程只關注io事件,事件處理放到別的線程,一種對事件分類主Reactor只關注Accept事件,子Reactor關注read和write事件。事件處理放到線程去做,這也是netty的設計模式。
?
class HandlerPool implements Runnable {final SelectionKey sk;final SocketChannel channel;final static int MAXSIZE=1024;ByteBuffer input=ByteBuffer.allocate(MAXSIZE);ByteBuffer output=ByteBuffer.allocate(MAXSIZE);static ExecutorService executor = Executors.newFixedThreadPool(100);static final int READING=0,SENDING=1;int state=READING;public HandlerPool(Selector selector,SocketChannel channel) throws IOException {this.channel=channel;this.channel.configureBlocking(false);/*** 這個有個問題,這里注冊的SelectionKey是不處理的,應該他監聽的事件為0*/sk=this.channel.register(selector,0);sk.attach(this);/*** 這里的作用是,SelectionKey的監聽事件為OP_READ。interestOps對哪個事件感興趣*/sk.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void run() {executor.execute(new Processer());}class Processer implements Runnable{@Overridepublic void run() {}} }這個就在Acceptor里面多實例化幾個Selector,它處理Read和Write事件。
?
大致架構弄懂了。后面邊看netty源碼,邊學習nio
?
轉載于:https://www.cnblogs.com/gaoxing/p/4237789.html
總結
以上是生活随笔為你收集整理的NIO和Reactor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 杀死占用端口的进程
- 下一篇: bash中通过设置PS1变量改变提示符颜