日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

高并发核心Selector详解

發(fā)布時間:2025/3/15 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 高并发核心Selector详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
  • Selector設計
  • 筆者下載得是openjdk8的源碼, 畫出類圖

    比較清晰得看到,openjdk中Selector的實現(xiàn)是SelectorImpl,然后SelectorImpl又將職責委托給了具體的平臺,比如圖中框出的

    • linux2.6以后才有的EpollSelectorImpl
    • Windows平臺是WindowsSelectorImpl
    • MacOSX平臺是KQueueSelectorImpl

    從名字也可以猜到,openjdk肯定在底層還是用epoll,kqueue,iocp這些技術來實現(xiàn)的I/O多路復用。

  • 獲取Selector
  • 眾所周知,Selector.open()可以得到一個Selector實例,怎么實現(xiàn)的呢?

    // Selector.java public static Selector open() throws IOException {// 首先找到provider,然后再打開Selectorreturn SelectorProvider.provider().openSelector(); } // java.nio.channels.spi.SelectorProviderpublic static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 這里就是打開Selector的真正方法provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});} }

    在openjdk中,每個操作系統(tǒng)都有一個sun.nio.ch.DefaultSelectorProvider實現(xiàn),以solaris為例:

    /*** Returns the default SelectorProvider.*/ public static SelectorProvider create() {// 獲取OS名稱String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));// 根據名稱來創(chuàng)建不同的Selctorif (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider(); }

    如果系統(tǒng)名稱是Linux的話,真正創(chuàng)建的是sun.nio.ch.EPollSelectorProvider。如果不是SunOS也不是Linux,就使用sun.nio.ch.PollSelectorProvider, 關于PollSelector有興趣的讀者自行了解下, 本文僅以實際常用的EpollSelector為例探討。

    打開sun.nio.ch.EPollSelectorProvider查看openSelector方法

    public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this); }

    很直觀,這樣我們在Linux平臺就得到了最終的Selector實現(xiàn):sun.nio.ch.EPollSelectorImpl

  • EPollSelector如何進行select
    epoll系統(tǒng)調用主要分為3個函數(shù)
    • epoll_create: 創(chuàng)建一個epollfd,并開辟epoll自己的內核高速cache區(qū),建立紅黑樹,分配好想要的size的內存對象,建立一個list鏈表,用于存儲準備就緒的事件。
    • epoll_wait: 等待內核返回IO事件
    • epoll_ctl: 對新舊事件進行新增修改或者刪除
    • 3.1 Epoll fd的創(chuàng)建

    EPollSelectorImpl的構造器代碼如下:

    EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);// makePipe返回管道的2個文件描述符,編碼在一個long類型的變量中// 高32位代表讀 低32位代表寫// 使用pipe為了實現(xiàn)Selector的wakeup邏輯long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 新建一個EPollArrayWrapperpollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>(); }

    再看EPollArrayWrapper的初始化過程

    EPollArrayWrapper() throws IOException {// creates the epoll file descriptor// 創(chuàng)建epoll fdepfd = epollCreate();// the epoll_event array passed to epoll_waitint allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>(); } private native int epollCreate();

    在初始化過程中調用了epollCreate方法,這是個native方法。
    打開

    jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

    JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/// 這里的size可以不指定,從Linux2.6.8之后,改用了紅黑樹結構,指定了大小也沒啥用int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd; }

    可以看到最后還是使用了操作系統(tǒng)的api: epoll_create函數(shù)

    • 3.2 Epoll wait等待內核IO事件

    調用Selector.select(),最后會委托給各個實現(xiàn)的doSelect方法,限于篇幅不貼出太詳細的,這里看下EpollSelectorImpl的doSelect方法

    protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();// 真正的實現(xiàn)是這行pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();// 以下基本都是異常處理if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated; }

    然后我們去看pollWrapper.poll, 打開jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java:

    int poll(long timeout) throws IOException {updateRegistrations();// 這個epollWait是不是有點熟悉呢?updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated; }private native int epollWait(long pollAddress, int numfds, long timeout,int epfd) throws IOException;

    epollWait也是個native方法,打開c代碼一看:

    JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd) {struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) { /* Indefinite or no wait */// 發(fā)起epoll_wait系統(tǒng)調用等待內核事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else { /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res; }

    可以看到,最后還是發(fā)起的epoll_wait系統(tǒng)調用.

    • 3.3 epoll control以及openjdk對事件管理的封裝

    JDK中對于注冊到Selector上的IO事件關系是使用SelectionKey來表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.

    調用Selector.register()時均會將事件存儲到EpollArrayWrapper的成員變量eventsLow和eventsHigh中

    // events for file descriptors with registration changes pending, indexed // by file descriptor and stored as bytes for efficiency reasons. For // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at // least) then the update is stored in a map. // 使用數(shù)組保存事件變更, 數(shù)組的最大長度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024 private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; // 超過數(shù)組長度的事件會緩存到這個map中,等待下次處理 private Map<Integer,Byte> eventsHigh;/*** Sets the pending update events for the given file descriptor. This* method has no effect if the update events is already set to KILLED,* unless {@code force} is {@code true}.*/ private void setUpdateEvents(int fd, byte events, boolean force) {// 判斷fd和數(shù)組長度if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}} }

    上面看到EpollArrayWrapper.poll()的時候, 首先會調用updateRegistrations

    /*** Returns the pending update events for the given file descriptor.*/ private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();} }/*** Update the pending registrations.*/ private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];// 從保存的eventsLow和eventsHigh里取出事件short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;if (events != KILLED) {// 判斷操作類型以傳給epoll_ctl// 沒有指定EPOLLET事件類型if (isRegistered) {opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 熟悉的epoll_ctlepollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;} } private native void epollCtl(int epfd, int opcode, int fd, int events);

    在獲取到事件之后將操作委托給了epollCtl,這又是個native方法,打開相應的c代碼一看:

    JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events) {struct epoll_event event;int res;event.events = events;event.data.fd = fd;// 發(fā)起epoll_ctl調用來進行IO事件的管理RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");} }

    原來還是我們的老朋友epoll_ctl.
    有個小細節(jié)是jdk沒有指定ET(邊緣觸發(fā))還是LT(水平觸發(fā)),所以默認會用LT:)

    在AbstractSelectorImpl中有3個set保存事件

    // Public views of the key sets // 注冊的所有事件 private Set<SelectionKey> publicKeys; // Immutable // 內核返回的IO事件封裝,表示哪些fd有數(shù)據可讀可寫 private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition// 取消的事件 private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

    在EpollArrayWrapper.poll調用完成之后, 會調用updateSelectedKeys來更新上面的仨set

    private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;

    }
    代碼很直白,拿出事件對set比對操作。

  • Selector類的相關方法
  • 重點注意四個方法

    • select(): 這是一個阻塞方法,調用該方法,會阻塞,直到返回一個有事件發(fā)生的selectionKey集合
    • selectNow() :非阻塞方法,獲取不到有事件發(fā)生的selectionKey集合,也會立即返回
    • select(long):阻塞方法,如果沒有獲取到有事件發(fā)生的selectionKey集合,阻塞指定的long時間
    • selectedKeys(): 返回全部selectionKey集合,不管是否有事件發(fā)生

    可以理解:selector一直在監(jiān)聽select()

  • Selector、SelectionKey、ServerScoketChannel、ScoketChannel的關系
    • Server代碼:
    public class NIOServer {public static void main(String[] args) throws Exception{//創(chuàng)建ServerSocketChannel -> ServerSocketServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//得到一個Selecor對象Selector selector = Selector.open();//綁定一個端口6666, 在服務器端監(jiān)聽serverSocketChannel.socket().bind(new InetSocketAddress(6666));//設置為非阻塞serverSocketChannel.configureBlocking(false);//把 serverSocketChannel 注冊到 selector 關心 事件為 OP_ACCEPTserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("注冊后的selectionkey 數(shù)量=" + selector.keys().size()); // 1//循環(huán)等待客戶端連接while (true) {//這里我們等待1秒,如果沒有事件發(fā)生, 返回if(selector.select(1000) == 0) { //沒有事件發(fā)生System.out.println("服務器等待了1秒,無連接");continue;}//如果返回的>0, 就獲取到相關的 selectionKey集合//1.如果返回的>0, 表示已經獲取到關注的事件//2. selector.selectedKeys() 返回關注事件的集合// 通過 selectionKeys 反向獲取通道Set<SelectionKey> selectionKeys = selector.selectedKeys();System.out.println("selectionKeys 數(shù)量 = " + selectionKeys.size());//遍歷 Set<SelectionKey>, 使用迭代器遍歷Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()) {//獲取到SelectionKeySelectionKey key = keyIterator.next();//根據key 對應的通道發(fā)生的事件做相應處理if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客戶端連接//該該客戶端生成一個 SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();System.out.println("客戶端連接成功 生成了一個 socketChannel " + socketChannel.hashCode());//將 SocketChannel 設置為非阻塞socketChannel.configureBlocking(false);//將socketChannel 注冊到selector, 關注事件為 OP_READ, 同時給socketChannel//關聯(lián)一個BuffersocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));System.out.println("客戶端連接后 ,注冊的selectionkey 數(shù)量=" + selector.keys().size()); //2,3,4..}if(key.isReadable()) { //發(fā)生 OP_READ//通過key 反向獲取到對應channelSocketChannel channel = (SocketChannel)key.channel();//獲取到該channel關聯(lián)的bufferByteBuffer buffer = (ByteBuffer)key.attachment();channel.read(buffer);System.out.println("form 客戶端 " + new String(buffer.array()));}//手動從集合中移動當前的selectionKey, 防止重復操作keyIterator.remove();}}} }
    • Client代碼
    public class NIOClient {public static void main(String[] args) throws Exception{//得到一個網絡通道SocketChannel socketChannel = SocketChannel.open();//設置非阻塞socketChannel.configureBlocking(false);//提供服務器端的ip 和 端口InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);//連接服務器if (!socketChannel.connect(inetSocketAddress)) {while (!socketChannel.finishConnect()) {System.out.println("因為連接需要時間,客戶端不會阻塞,可以做其它工作..");}}//...如果連接成功,就發(fā)送數(shù)據String str = "hello, 尚硅谷~";//Wraps a byte array into a bufferByteBuffer buffer = ByteBuffer.wrap(str.getBytes());//發(fā)送數(shù)據,將 buffer 數(shù)據寫入 channelsocketChannel.write(buffer);System.in.read();} }
  • 總結

  • jdk中Selector是對操作系統(tǒng)的IO多路復用調用的一個封裝,在Linux中就是對epoll的封裝。epoll實質上是將event loop交給了內核,因為網絡數(shù)據都是首先到內核的,直接內核處理可以避免無謂的系統(tǒng)調用和數(shù)據拷貝, 性能是最好的。jdk中對IO事件的封裝是SelectionKey, 保存Channel關心的事件。

    本文轉自

    總結

    以上是生活随笔為你收集整理的高并发核心Selector详解的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。