日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Selector 实现原理

發布時間:2023/12/3 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Selector 实现原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉載自?Selector 實現原理

概述

Selector是NIO中實現I/O多路復用的關鍵類。Selector實現了通過一個線程管理多個Channel,從而管理多個網絡連接的目的。

Channel代表這一個網絡連接通道,我們可以將Channel注冊到Selector中以實現Selector對其的管理。一個Channel可以注冊到多個不同的Selector中。

當Channel注冊到Selector后會返回一個SelectionKey對象,該SelectionKey對象則代表這這個Channel和它注冊的Selector間的關系。并且SelectionKey中維護著兩個很重要的屬性:interestOps、readyOps

interestOps是我們希望Selector監聽Channel的哪些事件。我們將我們感興趣的事件設置到該字段,這樣在selection操作時,當發現該Channel有我們所感興趣的事件發生時,就會將我們感興趣的事件再設置到readyOps中,這樣我們就能得知是哪些事件發生了以做相應處理。

Selector的中的重要屬性

Selector中維護3個特別重要的SelectionKey集合,分別是

  • keys:所有注冊到Selector的Channel所表示的SelectionKey都會存在于該集合中。keys元素的添加會在Channel注冊到Selector時發生。
  • selectedKeys:該集合中的每個SelectionKey都是其對應的Channel在上一次操作selection期間被檢查到至少有一種SelectionKey中所感興趣的操作已經準備好被處理。該集合是keys的一個子集。
  • cancelledKeys:執行了取消操作的SelectionKey會被放入到該集合中。該集合是keys的一個子集。

下面的源碼解析會說明上面3個集合的用處。

Selector 源碼解析

下面我們通過一段對Selector的使用流程講解來進一步深入其實現原理。
首先先來段Selector最簡單的使用片段

1234567891011121314151617ServerSocketChannel serverChannel = ServerSocketChannel.open();????????serverChannel.configureBlocking(false);????????intport = 5566;?????????????????serverChannel.socket().bind(newInetSocketAddress(port));????????Selector selector = Selector.open();????????serverChannel.register(selector, SelectionKey.OP_ACCEPT);????????while(true){????????????intn = selector.select();????????????if(n > 0) {????????????????Iterator<SelectionKey> iter = selector.selectedKeys().iterator();????????????????while(iter.hasNext()) {????????????????????SelectionKey selectionKey = iter.next();????????????????????......????????????????????iter.remove();????????????????}????????????}????????}

1、Selector的構建

SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現。

ServerSocketChannel.open();

123publicstatic ServerSocketChannel open() throwsIOException {????returnSelectorProvider.provider().openServerSocketChannel();}

SocketChannel.open();

123publicstatic SocketChannel open() throwsIOException {????returnSelectorProvider.provider().openSocketChannel();}

Selector.open();

123publicstatic Selector open() throwsIOException {????returnSelectorProvider.provider().openSelector();}

我們來進一步的了解下SelectorProvider.provider()

1234567891011121314151617publicstatic SelectorProvider provider() {????????synchronized(lock) {????????????if(provider != null)????????????????returnprovider;????????????returnAccessController.doPrivileged(????????????????newPrivilegedAction<>() {????????????????????publicSelectorProvider run() {????????????????????????????if(loadProviderFromProperty())????????????????????????????????returnprovider;????????????????????????????if(loadProviderAsService())????????????????????????????????returnprovider;????????????????????????????provider = sun.nio.ch.DefaultSelectorProvider.create();????????????????????????????returnprovider;????????????????????????}????????????????????});????????}????}

① 如果配置了“java.nio.channels.spi.SelectorProvider”屬性,則通過該屬性值load對應的SelectorProvider對象,如果構建失敗則拋異常。
② 如果provider類已經安裝在了對系統類加載程序可見的jar包中,并且該jar包的源碼目錄META-INF/services包含有一個java.nio.channels.spi.SelectorProvider提供類配置文件,則取文件中第一個類名進行load以構建對應的SelectorProvider對象,如果構建失敗則拋異常。
③ 如果上面兩種情況都不存在,則返回系統默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
④ 隨后在調用該方法,即SelectorProvider.provider()。則返回第一次調用的結果。

不同系統對應著不同的sun.nio.ch.DefaultSelectorProvider

這里我們看linux下面的sun.nio.ch.DefaultSelectorProvider

123456789101112131415publicclass DefaultSelectorProvider {????/**?????* Prevent instantiation.?????*/????privateDefaultSelectorProvider() { }????/**?????* Returns the default SelectorProvider.?????*/????publicstatic SelectorProvider create() {????????returnnew sun.nio.ch.EPollSelectorProvider();????}}

可以看見,linux系統下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這里對應于linux系統的epoll

接下來看下 selector.open():

12345678910111213141516/**?????* Opens a selector.?????*?????* <p> The new selector is created by invoking the {@link?????* java.nio.channels.spi.SelectorProvider#openSelector openSelector} method?????* of the system-wide default {@link?????* java.nio.channels.spi.SelectorProvider} object.? </p>?????*?????* @return? A new selector?????*?????* @throws? IOException?????*????????? If an I/O error occurs?????*/????publicstatic Selector open() throwsIOException {????????returnSelectorProvider.provider().openSelector();????}

在得到sun.nio.ch.EPollSelectorProvider后調用openSelector()方法構建Selector,這里會構建一個EPollSelectorImpl對象。

EPollSelectorImpl

12345678910111213classEPollSelectorImpl????extendsSelectorImpl{????// File descriptors used for interrupt????protectedint fd0;????protectedint fd1;????// The poll object????EPollArrayWrapper pollWrapper;????// Maps from file descriptors to keys????privateMap<Integer,SelectionKeyImpl> fdToKey;
1234567891011121314151617181920212223EPollSelectorImpl(SelectorProvider sp) throwsIOException {????????super(sp);????????longpipeFds = IOUtil.makePipe(false);????????fd0 = (int) (pipeFds >>> 32);????????fd1 = (int) pipeFds;????????try{????????????pollWrapper = newEPollArrayWrapper();????????????pollWrapper.initInterrupt(fd0, fd1);????????????fdToKey = newHashMap<>();????????}catch(Throwable t) {????????????try{????????????????FileDispatcherImpl.closeIntFD(fd0);????????????}catch(IOException ioe0) {????????????????t.addSuppressed(ioe0);????????????}????????????try{????????????????FileDispatcherImpl.closeIntFD(fd1);????????????}catch(IOException ioe1) {????????????????t.addSuppressed(ioe1);????????????}????????????throwt;????????}????}

EPollSelectorImpl構造函數完成:

① EPollArrayWrapper的構建,EpollArrayWapper將Linux的epoll相關系統調用封裝成了native方法供EpollSelectorImpl使用。
② 通過EPollArrayWrapper向epoll注冊中斷事件

12345voidinitInterrupt(intfd0, intfd1) {????????outgoingInterruptFD = fd1;????????incomingInterruptFD = fd0;????????epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);????}

③ fdToKey:構建文件描述符-SelectionKeyImpl映射表,所有注冊到selector的channel對應的SelectionKey和與之對應的文件描述符都會放入到該映射表中。

EPollArrayWrapper

EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selection操作的結果,即epoll_wait結果的epoll_event數組。
EPollArrayWrapper操縱了一個linux系統下epoll_event結構的本地數組。

1234567891011* typedef union epoll_data {*????void*ptr;*????intfd;*???? __uint32_t u32;*???? __uint64_t u64;*? } epoll_data_t;** struct epoll_event {*???? __uint32_t events;*???? epoll_data_t data;* };

epoll_event的數據成員(epoll_data_t data)包含有與通過epoll_ctl將文件描述符注冊到epoll時設置的數據相同的數據。這里data.fd為我們注冊的文件描述符。這樣我們在處理事件的時候持有有效的文件描述符了。

EPollArrayWrapper將Linux的epoll相關系統調用封裝成了native方法供EpollSelectorImpl使用。

1234privatenative int epollCreate();????privatenative void epollCtl(intepfd, intopcode, intfd, intevents);????privatenative int epollWait(longpollAddress, intnumfds, longtimeout,?????????????????????????????????intepfd) throwsIOException;

上述三個native方法就對應Linux下epoll相關的三個系統調用

12345678// The fd of the epoll driver????privatefinal int epfd;?????// The epoll_event array for results from epoll_wait????privatefinal AllocatedNativeObject pollArray;????// Base address of the epoll_event array????privatefinal long pollArrayAddress;
123// 用于存儲已經注冊的文件描述符和其注冊等待改變的事件的關聯關系。在epoll_wait操作就是要檢測這里文件描述法注冊的事件是否有發生。????privatefinal byte[] eventsLow = newbyte[MAX_UPDATE_ARRAY_SIZE];????privatefinal Map<Integer,Byte> eventsHigh = newHashMap<>();
123456789EPollArrayWrapper()throwsIOException {????????// creates the epoll file descriptor????????epfd = epollCreate();????????// the epoll_event array passed to epoll_wait????????intallocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;????????pollArray = newAllocatedNativeObject(allocationSize, true);????????pollArrayAddress = pollArray.address();????}

EPoolArrayWrapper構造函數,創建了epoll文件描述符。構建了一個用于存放epoll_wait返回結果的epoll_event數組。

ServerSocketChannel的構建

ServerSocketChannel.open();

返回ServerSocketChannelImpl對象,構建linux系統下ServerSocket的文件描述符。

123456// Our file descriptor????privatefinal FileDescriptor fd;????// fd value needed for dev/poll. This value will remain valid????// even after the value in the file descriptor object has been set to -1????privateint fdVal;
123456ServerSocketChannelImpl(SelectorProvider sp) throwsIOException {????????super(sp);????????this.fd =? Net.serverSocket(true);????????this.fdVal = IOUtil.fdVal(fd);????????this.state = ST_INUSE;????}

將ServerSocketChannel注冊到Selector

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

12345678910111213141516171819202122232425262728publicfinal SelectionKey register(Selector sel, intops,???????????????????????????????????????Object att)????????throwsClosedChannelException????{????????synchronized(regLock) {????????????if(!isOpen())????????????????thrownew ClosedChannelException();????????????if((ops & ~validOps()) != 0)????????????????thrownew IllegalArgumentException();????????????if(blocking)????????????????thrownew IllegalBlockingModeException();????????????SelectionKey k = findKey(sel);????????????if(k != null) {????????????????k.interestOps(ops);????????????????k.attach(att);????????????}????????????if(k == null) {????????????????// New registration????????????????synchronized(keyLock) {????????????????????if(!isOpen())????????????????????????thrownew ClosedChannelException();????????????????????k = ((AbstractSelector)sel).register(this, ops, att);????????????????????addKey(k);????????????????}????????????}????????????returnk;????????}????}
1234567891011121314protectedfinal SelectionKey register(AbstractSelectableChannel ch,??????????????????????????????????????????intops,??????????????????????????????????????????Object attachment)????{????????if(!(ch instanceofSelChImpl))????????????thrownew IllegalSelectorException();????????SelectionKeyImpl k = newSelectionKeyImpl((SelChImpl)ch, this);????????k.attach(attachment);????????synchronized(publicKeys) {????????????implRegister(k);????????}????????k.interestOps(ops);????????returnk;????}

① 構建代表channel和selector間關系的SelectionKey對象
② implRegister(k)將channel注冊到epoll中
③ k.interestOps(int) 完成下面兩個操作:
a) 會將注冊的感興趣的事件和其對應的文件描述存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。
b) 同時該操作還會將設置SelectionKey的interestOps字段,這是給我們程序員獲取使用的。

EPollSelectorImpl. implRegister

123456789protectedvoid implRegister(SelectionKeyImpl ski) {????????if(closed)????????????thrownew ClosedSelectorException();????????SelChImpl ch = ski.channel;????????intfd = Integer.valueOf(ch.getFDVal());????????fdToKey.put(fd, ski);????????pollWrapper.add(fd);????????keys.add(ski);????}

① 將channel對應的fd(文件描述符)和對應的SelectionKeyImpl放到fdToKey映射表中。
② 將channel對應的fd(文件描述符)添加到EPollArrayWrapper中,并強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在于之前被取消過的注冊中。)
③ 將selectionKey放入到keys集合中。

Selection操作

selection操作有3中類型:

① select():該方法會一直阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了)為止,除非當前線程發生中斷或者selector的wakeup方法被調用。
② select(long time):該方法和select()類似,該方法也會導致阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了)為止,除非下面3種情況任意一種發生:a) 設置的超時時間到達;b) 當前線程發生中斷;c) selector的wakeup方法被調用
③ selectNow():該方法不會發生阻塞,如果沒有一個channel被選擇也會立即返回。

我們主要來看看select()的實現 :int n = selector.select();

123publicint select() throwsIOException {????returnselect(0);}

最終會調用到EPollSelectorImpl的doSelect

1234567891011121314151617181920212223protectedint doSelect(longtimeout) throwsIOException {????????if(closed)????????????thrownew ClosedSelectorException();????????processDeregisterQueue();????????try{????????????begin();????????????pollWrapper.poll(timeout);????????}finally{????????????end();????????}????????processDeregisterQueue();????????intnumKeysUpdated = updateSelectedKeys();????????if(pollWrapper.interrupted()) {????????????// Clear the wakeup pipe????????????pollWrapper.putEventOps(pollWrapper.interruptedIndex(),0);????????????synchronized(interruptLock) {????????????????pollWrapper.clearInterrupted();????????????????IOUtil.drain(fd0);????????????????interruptTriggered = false;????????????}????????}????????returnnumKeysUpdated;????}

① 先處理注銷的selectionKey隊列
② 進行底層的epoll_wait操作
③ 再次對注銷的selectionKey隊列進行處理
④ 更新被選擇的selectionKey

先來看processDeregisterQueue():

1234567891011121314151617181920212223voidprocessDeregisterQueue() throwsIOException {????????Set var1 = this.cancelledKeys();????????synchronized(var1) {????????????if(!var1.isEmpty()) {????????????????Iterator var3 = var1.iterator();????????????????while(var3.hasNext()) {????????????????????SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();????????????????????try{????????????????????????this.implDereg(var4);????????????????????}catch(SocketException var12) {????????????????????????IOException var6 = newIOException("Error deregistering key");????????????????????????var6.initCause(var12);????????????????????????throwvar6;????????????????????}finally{????????????????????????var3.remove();????????????????????}????????????????}????????????}????????}????}

從cancelledKeys集合中依次取出注銷的SelectionKey,執行注銷操作,將處理后的SelectionKey從cancelledKeys集合中移除。執行processDeregisterQueue()后cancelledKeys集合會為空。

1234567891011121314protectedvoid implDereg(SelectionKeyImpl ski) throwsIOException {????????assert(ski.getIndex() >= 0);????????SelChImpl ch = ski.channel;????????intfd = ch.getFDVal();????????fdToKey.remove(Integer.valueOf(fd));????????pollWrapper.remove(fd);????????ski.setIndex(-1);????????keys.remove(ski);????????selectedKeys.remove(ski);????????deregister((AbstractSelectionKey)ski);????????SelectableChannel selch = ski.channel();????????if(!selch.isOpen() && !selch.isRegistered())????????????((SelChImpl)selch).kill();????}

注銷會完成下面的操作:
① 將已經注銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除
② 將selectionKey所代表的channel的文件描述符從EPollArrayWrapper中移除
③ 將selectionKey從keys集合中移除,這樣下次selector.select()就不會再將該selectionKey注冊到epoll中監聽
④ 也會將selectionKey從對應的channel中注銷
⑤ 最后如果對應的channel已經關閉并且沒有注冊其他的selector了,則將該channel關閉

完成上面的的操作后,注銷的SelectionKey就不會出現先在keys、selectedKeys以及cancelKeys這3個集合中的任何一個。

接著我們來看EPollArrayWrapper.poll(timeout):

123456789101112intpoll(longtimeout) throwsIOException {????????updateRegistrations();????????updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);????????for(inti=0; i<updated; i++) {????????????if(getDescriptor(i) == incomingInterruptFD) {????????????????interruptedIndex = i;????????????????interrupted = true;????????????????break;????????????}????????}????????returnupdated;????}

updateRegistrations()方法會將已經注冊到該selector的事件(eventsLow或eventsHigh)通過調用epollCtl(epfd, opcode, fd, events); 注冊到linux系統中。

這里epollWait就會調用linux底層的epoll_wait方法,并返回在epoll_wait期間有事件觸發的entry的個數

再看updateSelectedKeys():

123456789101112131415161718192021222324privateint updateSelectedKeys() {????????intentries = pollWrapper.updated;????????intnumKeysUpdated = 0;????????for(inti=0; i<entries; i++) {????????????intnextFD = pollWrapper.getDescriptor(i);????????????SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));????????????// ski is null in the case of an interrupt????????????if(ski != null) {????????????????intrOps = pollWrapper.getEventOps(i);????????????????if(selectedKeys.contains(ski)) {????????????????????if(ski.channel.translateAndSetReadyOps(rOps, ski)) {????????????????????????numKeysUpdated++;????????????????????}????????????????}else{????????????????????ski.channel.translateAndSetReadyOps(rOps, ski);????????????????????if((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {????????????????????????selectedKeys.add(ski);????????????????????????numKeysUpdated++;????????????????????}????????????????}????????????}????????}????????returnnumKeysUpdated;????}

該方法會從通過EPollArrayWrapper pollWrapper 以及 fdToKey( 構建文件描述符-SelectorKeyImpl映射表 )來獲取有事件觸發的SelectionKeyImpl對象,然后將SelectionKeyImpl放到selectedKey集合( 有事件觸發的selectionKey集合,可以通過selector.selectedKeys()方法獲得 )中,即selectedKeys。并重新設置SelectionKeyImpl中相關的readyOps值。

但是,這里要注意兩點:

① 如果SelectionKeyImpl已經存在于selectedKeys集合中,并且發現觸發的事件已經存在于readyOps中了,則不會使numKeysUpdated++;這樣會使得我們無法得知該事件的變化。

上面這點說明了為什么我們要在每次從selectedKey中獲取到Selectionkey后,將其從selectedKey集合移除,就是為了當有事件觸發使selectionKey能正確到放入selectedKey集合中,并正確的通知給調用者。

② 如果發現channel所發生I/O事件不是當前SelectionKey所感興趣,則不會將SelectionKeyImpl放入selectedKeys集合中,也不會使numKeysUpdated++

epoll原理

epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。
先看看使用c封裝的3個epoll系統調用:

  • int epoll_create(int size)

epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多于這個最大數時內核可不保證效果。

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll_ctl可以操作epoll_create創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。

  • int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)

epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。

大概看看epoll內部是怎么實現的:

  • epoll初始化時,會向內核注冊一個文件系統,用于存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中創建一個file節點。同時epoll會開辟自己的內核高速緩存區,以紅黑樹的結構保存句柄,以支持快速的查找、插入、刪除。還會再建立一個list鏈表,用于存儲準備就緒的事件。
  • 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程序注冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中后,就把socket插入到就緒鏈表里。
  • 當epoll_wait調用時,僅僅觀察就緒鏈表里有沒有數據,如果有數據就返回,否則就sleep,超時時立刻返回。
  • epoll的兩種工作模式:

    • LT:level-trigger,水平觸發模式,只要某個socket處于readable/writable狀態,無論什么時候進行epoll_wait都會返回該socket。
    • ET:edge-trigger,邊緣觸發模式,只有某個socket從unreadable變為readable或從unwritable變為writable時,epoll_wait才會返回該socket。

    socket讀數據

    socket寫數據

    最后順便說下在Linux系統中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。

    后記

    因為本人對計算機系統組成以及C語言等知識比較欠缺,因為文中相關知識點的表示也相當“膚淺”,如有不對不妥的地方望讀者指出。同時我也會繼續加強對該方面知識點的學習~

    參考

    • http://www.jianshu.com/p/0d497fe5484a
    • http://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/
    • 圣思園netty課程

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的Selector 实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。