Selector 实现原理
轉載自?Selector 實現原理
概述
Selector是NIO中實現I/O多路復用的關鍵類。Selector實現了通過一個線程管理多個Channel,從而管理多個網絡連接的目的。
Channel代表這一個網絡連接通道,我們可以將Channel注冊到Selector中以實現Selector對其的管理。一個Channel可以注冊到多個不同的Selector中。
當Channel注冊到Selector后會返回一個SelectionKey對象,該SelectionKey對象則代表這這個Channel和它注冊的Selector間的關系。并且SelectionKey中維護著兩個很重要的屬性:interestOps、readyOps
interestOps是我們希望Selector監聽Channel的哪些事件。我們將我們感興趣的事件設置到該字段,這樣在selection操作時,當發現該Channel有我們所感興趣的事件發生時,就會將我們感興趣的事件再設置到readyOps中,這樣我們就能得知是哪些事件發生了以做相應處理。
Selector的中的重要屬性
Selector中維護3個特別重要的SelectionKey集合,分別是
- keys:所有注冊到Selector的Channel所表示的SelectionKey都會存在于該集合中。keys元素的添加會在Channel注冊到Selector時發生。
- selectedKeys:該集合中的每個SelectionKey都是其對應的Channel在上一次操作selection期間被檢查到至少有一種SelectionKey中所感興趣的操作已經準備好被處理。該集合是keys的一個子集。
- cancelledKeys:執行了取消操作的SelectionKey會被放入到該集合中。該集合是keys的一個子集。
下面的源碼解析會說明上面3個集合的用處。
Selector 源碼解析
下面我們通過一段對Selector的使用流程講解來進一步深入其實現原理。
首先先來段Selector最簡單的使用片段
| 1234567891011121314151617 | ServerSocketChannel serverChannel = ServerSocketChannel.open();????????serverChannel.configureBlocking(false);????????intport = 5566;?????????????????serverChannel.socket().bind(newInetSocketAddress(port));????????Selector selector = Selector.open();????????serverChannel.register(selector, SelectionKey.OP_ACCEPT);????????while(true){????????????intn = selector.select();????????????if(n > 0) {????????????????Iterator<SelectionKey> iter = selector.selectedKeys().iterator();????????????????while(iter.hasNext()) {????????????????????SelectionKey selectionKey = iter.next();????????????????????......????????????????????iter.remove();????????????????}????????????}????????} |
1、Selector的構建
SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現。
ServerSocketChannel.open();
| 123 | publicstatic ServerSocketChannel open() throwsIOException {????returnSelectorProvider.provider().openServerSocketChannel();} |
SocketChannel.open();
| 123 | publicstatic SocketChannel open() throwsIOException {????returnSelectorProvider.provider().openSocketChannel();} |
Selector.open();
| 123 | publicstatic Selector open() throwsIOException {????returnSelectorProvider.provider().openSelector();} |
我們來進一步的了解下SelectorProvider.provider()
| 1234567891011121314151617 | publicstatic SelectorProvider provider() {????????synchronized(lock) {????????????if(provider != null)????????????????returnprovider;????????????returnAccessController.doPrivileged(????????????????newPrivilegedAction<>() {????????????????????publicSelectorProvider run() {????????????????????????????if(loadProviderFromProperty())????????????????????????????????returnprovider;????????????????????????????if(loadProviderAsService())????????????????????????????????returnprovider;????????????????????????????provider = sun.nio.ch.DefaultSelectorProvider.create();????????????????????????????returnprovider;????????????????????????}????????????????????});????????}????} |
① 如果配置了“java.nio.channels.spi.SelectorProvider”屬性,則通過該屬性值load對應的SelectorProvider對象,如果構建失敗則拋異常。
② 如果provider類已經安裝在了對系統類加載程序可見的jar包中,并且該jar包的源碼目錄META-INF/services包含有一個java.nio.channels.spi.SelectorProvider提供類配置文件,則取文件中第一個類名進行load以構建對應的SelectorProvider對象,如果構建失敗則拋異常。
③ 如果上面兩種情況都不存在,則返回系統默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
④ 隨后在調用該方法,即SelectorProvider.provider()。則返回第一次調用的結果。
不同系統對應著不同的sun.nio.ch.DefaultSelectorProvider
這里我們看linux下面的sun.nio.ch.DefaultSelectorProvider
| 123456789101112131415 | publicclass DefaultSelectorProvider {????/**?????* Prevent instantiation.?????*/????privateDefaultSelectorProvider() { }????/**?????* Returns the default SelectorProvider.?????*/????publicstatic SelectorProvider create() {????????returnnew sun.nio.ch.EPollSelectorProvider();????}} |
可以看見,linux系統下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這里對應于linux系統的epoll
接下來看下 selector.open():
| 12345678910111213141516 | /**?????* Opens a selector.?????*?????* <p> The new selector is created by invoking the {@link?????* java.nio.channels.spi.SelectorProvider#openSelector openSelector} method?????* of the system-wide default {@link?????* java.nio.channels.spi.SelectorProvider} object.? </p>?????*?????* @return? A new selector?????*?????* @throws? IOException?????*????????? If an I/O error occurs?????*/????publicstatic Selector open() throwsIOException {????????returnSelectorProvider.provider().openSelector();????} |
在得到sun.nio.ch.EPollSelectorProvider后調用openSelector()方法構建Selector,這里會構建一個EPollSelectorImpl對象。
EPollSelectorImpl
| 12345678910111213 | classEPollSelectorImpl????extendsSelectorImpl{????// File descriptors used for interrupt????protectedint fd0;????protectedint fd1;????// The poll object????EPollArrayWrapper pollWrapper;????// Maps from file descriptors to keys????privateMap<Integer,SelectionKeyImpl> fdToKey; |
| 1234567891011121314151617181920212223 | EPollSelectorImpl(SelectorProvider sp) throwsIOException {????????super(sp);????????longpipeFds = IOUtil.makePipe(false);????????fd0 = (int) (pipeFds >>> 32);????????fd1 = (int) pipeFds;????????try{????????????pollWrapper = newEPollArrayWrapper();????????????pollWrapper.initInterrupt(fd0, fd1);????????????fdToKey = newHashMap<>();????????}catch(Throwable t) {????????????try{????????????????FileDispatcherImpl.closeIntFD(fd0);????????????}catch(IOException ioe0) {????????????????t.addSuppressed(ioe0);????????????}????????????try{????????????????FileDispatcherImpl.closeIntFD(fd1);????????????}catch(IOException ioe1) {????????????????t.addSuppressed(ioe1);????????????}????????????throwt;????????}????} |
EPollSelectorImpl構造函數完成:
① EPollArrayWrapper的構建,EpollArrayWapper將Linux的epoll相關系統調用封裝成了native方法供EpollSelectorImpl使用。
② 通過EPollArrayWrapper向epoll注冊中斷事件
| 12345 | voidinitInterrupt(intfd0, intfd1) {????????outgoingInterruptFD = fd1;????????incomingInterruptFD = fd0;????????epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);????} |
③ fdToKey:構建文件描述符-SelectionKeyImpl映射表,所有注冊到selector的channel對應的SelectionKey和與之對應的文件描述符都會放入到該映射表中。
EPollArrayWrapper
EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selection操作的結果,即epoll_wait結果的epoll_event數組。
EPollArrayWrapper操縱了一個linux系統下epoll_event結構的本地數組。
| 1234567891011 | * typedef union epoll_data {*????void*ptr;*????intfd;*???? __uint32_t u32;*???? __uint64_t u64;*? } epoll_data_t;** struct epoll_event {*???? __uint32_t events;*???? epoll_data_t data;* }; |
epoll_event的數據成員(epoll_data_t data)包含有與通過epoll_ctl將文件描述符注冊到epoll時設置的數據相同的數據。這里data.fd為我們注冊的文件描述符。這樣我們在處理事件的時候持有有效的文件描述符了。
EPollArrayWrapper將Linux的epoll相關系統調用封裝成了native方法供EpollSelectorImpl使用。
| 1234 | privatenative int epollCreate();????privatenative void epollCtl(intepfd, intopcode, intfd, intevents);????privatenative int epollWait(longpollAddress, intnumfds, longtimeout,?????????????????????????????????intepfd) throwsIOException; |
上述三個native方法就對應Linux下epoll相關的三個系統調用
| 12345678 | // The fd of the epoll driver????privatefinal int epfd;?????// The epoll_event array for results from epoll_wait????privatefinal AllocatedNativeObject pollArray;????// Base address of the epoll_event array????privatefinal long pollArrayAddress; |
| 123 | // 用于存儲已經注冊的文件描述符和其注冊等待改變的事件的關聯關系。在epoll_wait操作就是要檢測這里文件描述法注冊的事件是否有發生。????privatefinal byte[] eventsLow = newbyte[MAX_UPDATE_ARRAY_SIZE];????privatefinal Map<Integer,Byte> eventsHigh = newHashMap<>(); |
| 123456789 | EPollArrayWrapper()throwsIOException {????????// creates the epoll file descriptor????????epfd = epollCreate();????????// the epoll_event array passed to epoll_wait????????intallocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;????????pollArray = newAllocatedNativeObject(allocationSize, true);????????pollArrayAddress = pollArray.address();????} |
EPoolArrayWrapper構造函數,創建了epoll文件描述符。構建了一個用于存放epoll_wait返回結果的epoll_event數組。
ServerSocketChannel的構建
ServerSocketChannel.open();
返回ServerSocketChannelImpl對象,構建linux系統下ServerSocket的文件描述符。
| 123456 | // Our file descriptor????privatefinal FileDescriptor fd;????// fd value needed for dev/poll. This value will remain valid????// even after the value in the file descriptor object has been set to -1????privateint fdVal; |
| 123456 | ServerSocketChannelImpl(SelectorProvider sp) throwsIOException {????????super(sp);????????this.fd =? Net.serverSocket(true);????????this.fdVal = IOUtil.fdVal(fd);????????this.state = ST_INUSE;????} |
將ServerSocketChannel注冊到Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
| 12345678910111213141516171819202122232425262728 | publicfinal SelectionKey register(Selector sel, intops,???????????????????????????????????????Object att)????????throwsClosedChannelException????{????????synchronized(regLock) {????????????if(!isOpen())????????????????thrownew ClosedChannelException();????????????if((ops & ~validOps()) != 0)????????????????thrownew IllegalArgumentException();????????????if(blocking)????????????????thrownew IllegalBlockingModeException();????????????SelectionKey k = findKey(sel);????????????if(k != null) {????????????????k.interestOps(ops);????????????????k.attach(att);????????????}????????????if(k == null) {????????????????// New registration????????????????synchronized(keyLock) {????????????????????if(!isOpen())????????????????????????thrownew ClosedChannelException();????????????????????k = ((AbstractSelector)sel).register(this, ops, att);????????????????????addKey(k);????????????????}????????????}????????????returnk;????????}????} |
| 1234567891011121314 | protectedfinal SelectionKey register(AbstractSelectableChannel ch,??????????????????????????????????????????intops,??????????????????????????????????????????Object attachment)????{????????if(!(ch instanceofSelChImpl))????????????thrownew IllegalSelectorException();????????SelectionKeyImpl k = newSelectionKeyImpl((SelChImpl)ch, this);????????k.attach(attachment);????????synchronized(publicKeys) {????????????implRegister(k);????????}????????k.interestOps(ops);????????returnk;????} |
① 構建代表channel和selector間關系的SelectionKey對象
② implRegister(k)將channel注冊到epoll中
③ k.interestOps(int) 完成下面兩個操作:
a) 會將注冊的感興趣的事件和其對應的文件描述存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。
b) 同時該操作還會將設置SelectionKey的interestOps字段,這是給我們程序員獲取使用的。
EPollSelectorImpl. implRegister
| 123456789 | protectedvoid implRegister(SelectionKeyImpl ski) {????????if(closed)????????????thrownew ClosedSelectorException();????????SelChImpl ch = ski.channel;????????intfd = Integer.valueOf(ch.getFDVal());????????fdToKey.put(fd, ski);????????pollWrapper.add(fd);????????keys.add(ski);????} |
① 將channel對應的fd(文件描述符)和對應的SelectionKeyImpl放到fdToKey映射表中。
② 將channel對應的fd(文件描述符)添加到EPollArrayWrapper中,并強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在于之前被取消過的注冊中。)
③ 將selectionKey放入到keys集合中。
Selection操作
selection操作有3中類型:
① select():該方法會一直阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了)為止,除非當前線程發生中斷或者selector的wakeup方法被調用。
② select(long time):該方法和select()類似,該方法也會導致阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了)為止,除非下面3種情況任意一種發生:a) 設置的超時時間到達;b) 當前線程發生中斷;c) selector的wakeup方法被調用
③ selectNow():該方法不會發生阻塞,如果沒有一個channel被選擇也會立即返回。
我們主要來看看select()的實現 :int n = selector.select();
| 123 | publicint select() throwsIOException {????returnselect(0);} |
最終會調用到EPollSelectorImpl的doSelect
| 1234567891011121314151617181920212223 | protectedint doSelect(longtimeout) throwsIOException {????????if(closed)????????????thrownew ClosedSelectorException();????????processDeregisterQueue();????????try{????????????begin();????????????pollWrapper.poll(timeout);????????}finally{????????????end();????????}????????processDeregisterQueue();????????intnumKeysUpdated = updateSelectedKeys();????????if(pollWrapper.interrupted()) {????????????// Clear the wakeup pipe????????????pollWrapper.putEventOps(pollWrapper.interruptedIndex(),0);????????????synchronized(interruptLock) {????????????????pollWrapper.clearInterrupted();????????????????IOUtil.drain(fd0);????????????????interruptTriggered = false;????????????}????????}????????returnnumKeysUpdated;????} |
① 先處理注銷的selectionKey隊列
② 進行底層的epoll_wait操作
③ 再次對注銷的selectionKey隊列進行處理
④ 更新被選擇的selectionKey
先來看processDeregisterQueue():
| 1234567891011121314151617181920212223 | voidprocessDeregisterQueue() throwsIOException {????????Set var1 = this.cancelledKeys();????????synchronized(var1) {????????????if(!var1.isEmpty()) {????????????????Iterator var3 = var1.iterator();????????????????while(var3.hasNext()) {????????????????????SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();????????????????????try{????????????????????????this.implDereg(var4);????????????????????}catch(SocketException var12) {????????????????????????IOException var6 = newIOException("Error deregistering key");????????????????????????var6.initCause(var12);????????????????????????throwvar6;????????????????????}finally{????????????????????????var3.remove();????????????????????}????????????????}????????????}????????}????} |
從cancelledKeys集合中依次取出注銷的SelectionKey,執行注銷操作,將處理后的SelectionKey從cancelledKeys集合中移除。執行processDeregisterQueue()后cancelledKeys集合會為空。
| 1234567891011121314 | protectedvoid implDereg(SelectionKeyImpl ski) throwsIOException {????????assert(ski.getIndex() >= 0);????????SelChImpl ch = ski.channel;????????intfd = ch.getFDVal();????????fdToKey.remove(Integer.valueOf(fd));????????pollWrapper.remove(fd);????????ski.setIndex(-1);????????keys.remove(ski);????????selectedKeys.remove(ski);????????deregister((AbstractSelectionKey)ski);????????SelectableChannel selch = ski.channel();????????if(!selch.isOpen() && !selch.isRegistered())????????????((SelChImpl)selch).kill();????} |
注銷會完成下面的操作:
① 將已經注銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除
② 將selectionKey所代表的channel的文件描述符從EPollArrayWrapper中移除
③ 將selectionKey從keys集合中移除,這樣下次selector.select()就不會再將該selectionKey注冊到epoll中監聽
④ 也會將selectionKey從對應的channel中注銷
⑤ 最后如果對應的channel已經關閉并且沒有注冊其他的selector了,則將該channel關閉
完成上面的的操作后,注銷的SelectionKey就不會出現先在keys、selectedKeys以及cancelKeys這3個集合中的任何一個。
接著我們來看EPollArrayWrapper.poll(timeout):
| 123456789101112 | intpoll(longtimeout) throwsIOException {????????updateRegistrations();????????updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);????????for(inti=0; i<updated; i++) {????????????if(getDescriptor(i) == incomingInterruptFD) {????????????????interruptedIndex = i;????????????????interrupted = true;????????????????break;????????????}????????}????????returnupdated;????} |
updateRegistrations()方法會將已經注冊到該selector的事件(eventsLow或eventsHigh)通過調用epollCtl(epfd, opcode, fd, events); 注冊到linux系統中。
這里epollWait就會調用linux底層的epoll_wait方法,并返回在epoll_wait期間有事件觸發的entry的個數
再看updateSelectedKeys():
| 123456789101112131415161718192021222324 | privateint updateSelectedKeys() {????????intentries = pollWrapper.updated;????????intnumKeysUpdated = 0;????????for(inti=0; i<entries; i++) {????????????intnextFD = pollWrapper.getDescriptor(i);????????????SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));????????????// ski is null in the case of an interrupt????????????if(ski != null) {????????????????intrOps = pollWrapper.getEventOps(i);????????????????if(selectedKeys.contains(ski)) {????????????????????if(ski.channel.translateAndSetReadyOps(rOps, ski)) {????????????????????????numKeysUpdated++;????????????????????}????????????????}else{????????????????????ski.channel.translateAndSetReadyOps(rOps, ski);????????????????????if((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {????????????????????????selectedKeys.add(ski);????????????????????????numKeysUpdated++;????????????????????}????????????????}????????????}????????}????????returnnumKeysUpdated;????} |
該方法會從通過EPollArrayWrapper pollWrapper 以及 fdToKey( 構建文件描述符-SelectorKeyImpl映射表 )來獲取有事件觸發的SelectionKeyImpl對象,然后將SelectionKeyImpl放到selectedKey集合( 有事件觸發的selectionKey集合,可以通過selector.selectedKeys()方法獲得 )中,即selectedKeys。并重新設置SelectionKeyImpl中相關的readyOps值。
但是,這里要注意兩點:
① 如果SelectionKeyImpl已經存在于selectedKeys集合中,并且發現觸發的事件已經存在于readyOps中了,則不會使numKeysUpdated++;這樣會使得我們無法得知該事件的變化。
上面這點說明了為什么我們要在每次從selectedKey中獲取到Selectionkey后,將其從selectedKey集合移除,就是為了當有事件觸發使selectionKey能正確到放入selectedKey集合中,并正確的通知給調用者。
② 如果發現channel所發生I/O事件不是當前SelectionKey所感興趣,則不會將SelectionKeyImpl放入selectedKeys集合中,也不會使numKeysUpdated++
epoll原理
epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。
先看看使用c封裝的3個epoll系統調用:
- int epoll_create(int size)
epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多于這個最大數時內核可不保證效果。
- int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll_ctl可以操作epoll_create創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。
- int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)
epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。
大概看看epoll內部是怎么實現的:
epoll的兩種工作模式:
- LT:level-trigger,水平觸發模式,只要某個socket處于readable/writable狀態,無論什么時候進行epoll_wait都會返回該socket。
- ET:edge-trigger,邊緣觸發模式,只有某個socket從unreadable變為readable或從unwritable變為writable時,epoll_wait才會返回該socket。
socket讀數據
socket寫數據
最后順便說下在Linux系統中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。
后記
因為本人對計算機系統組成以及C語言等知識比較欠缺,因為文中相關知識點的表示也相當“膚淺”,如有不對不妥的地方望讀者指出。同時我也會繼續加強對該方面知識點的學習~
參考
- http://www.jianshu.com/p/0d497fe5484a
- http://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/
- 圣思園netty課程
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的Selector 实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: epoll 浅析以及 nio 中的 Se
- 下一篇: NIO学习–缓冲区