日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

epoll监听文件_【原创】万字长文浅析:Epoll与Java Nio的那些事儿

發(fā)布時(shí)間:2025/3/19 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 epoll监听文件_【原创】万字长文浅析:Epoll与Java Nio的那些事儿 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

“ Epoll 是Linux內(nèi)核的高性能、可擴(kuò)展的I/O事件通知機(jī)制。

在linux2.5.44首次引入epoll,它設(shè)計(jì)的目的旨在取代既有的select、poll系統(tǒng)函數(shù),讓需要大量操作文件描述符的程序得以發(fā)揮更優(yōu)異的性能(wikipedia example: 舊有的系統(tǒng)函數(shù)所花費(fèi)的時(shí)間復(fù)雜度為O(n), epoll的時(shí)間復(fù)雜度O(log n))。epoll實(shí)現(xiàn)的功能與poll類似,都是監(jiān)聽多個(gè)文件描述符上的事件。

epoll底層是由可配置的操作系統(tǒng)內(nèi)核對象建構(gòu)而成,并以文件描述符(file descriptor)的形式呈現(xiàn)于用戶空間(from wikipedia: 在操作系統(tǒng)中,虛擬內(nèi)存通常會被分成用戶空間,與核心空間這兩個(gè)區(qū)段。這是存儲器保護(hù)機(jī)制中的一環(huán)。內(nèi)核**、核心擴(kuò)展(kernel extensions)、以及驅(qū)動程序,運(yùn)行在核心空間**上。而其他的應(yīng)用程序,則運(yùn)行在用戶空間上。所有運(yùn)行在用戶空間的應(yīng)用程序,都被統(tǒng)稱為用戶級(userland))。

多說一點(diǎn)關(guān)于內(nèi)核的

它是一個(gè)用來管理軟件發(fā)出的數(shù)據(jù)I/O的一個(gè)程序,并將數(shù)據(jù)交由CPU和電腦其他電子組件處理,但是直接對硬件操作是非常復(fù)雜的,通常內(nèi)核提供一種硬件抽象的方法來完成(由內(nèi)核決定一個(gè)程序在什么時(shí)候?qū)δ巢糠钟布僮鞫嚅L時(shí)間),通過這些方法來完成進(jìn)程間通信和系統(tǒng)調(diào)用。

宏內(nèi)核:

宏內(nèi)核簡單來說,首先定義了一個(gè)高階的抽象接口,叫系統(tǒng)調(diào)用(System call))來實(shí)現(xiàn)操作系統(tǒng)的功能,例如進(jìn)程管理,文件系統(tǒng),和存儲管理等等,這些功能由多個(gè)運(yùn)行在內(nèi)核態(tài)的程序來完成。

微內(nèi)核:

微內(nèi)核結(jié)構(gòu)由硬件抽象層和系統(tǒng)調(diào)用組成;包括了創(chuàng)建一個(gè)系統(tǒng)必需的幾個(gè)部分;如線程管理,地址空間和進(jìn)程間通信等。微核的目標(biāo)是將系統(tǒng)服務(wù)的實(shí)現(xiàn)和系統(tǒng)的基本操作規(guī)則分離開來。

linux就是使用的宏內(nèi)核。因?yàn)樗軌蛟谶\(yùn)行時(shí)將模塊調(diào)入執(zhí)行,使擴(kuò)充內(nèi)核的功能變得更簡單。

epoll做了什么事?

epoll 通過使用紅黑樹(RB-tree)搜索被監(jiān)視的文件描述符(file descriptor)。

在 epoll 實(shí)例上注冊事件時(shí),epoll 會將該事件添加到 epoll 實(shí)例的紅黑樹上并注冊一個(gè)回調(diào)函數(shù),當(dāng)事件發(fā)生時(shí)會將事件添加到就緒鏈表中。

epoll的結(jié)構(gòu)?

int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

①epoll_create

向內(nèi)核申請空間,創(chuàng)建一個(gè)epoll的句柄,size用來告訴內(nèi)核這個(gè)監(jiān)聽的數(shù)目一共有多大。這個(gè)參數(shù)不同于select()中的第一個(gè)參數(shù),給出最大監(jiān)聽的fd+1的值。在最初的實(shí)現(xiàn)中,調(diào)用者通過 size 參數(shù)告知內(nèi)核需要監(jiān)聽的文件描述符數(shù)量。如果監(jiān)聽的文件描述符數(shù)量超過 size, 則內(nèi)核會自動擴(kuò)容。而現(xiàn)在 size 已經(jīng)沒有這種語義了,但是調(diào)用者調(diào)用時(shí) size 依然必須大于 0,以保證后向兼容性。需要注意的是,當(dāng)創(chuàng)建好epoll句柄后,它就是會占用一個(gè)fd值,在linux下如果查看/proc/進(jìn)程id/fd/,是能夠看到這個(gè)fd的。

②epoll_ctl

向 epfd 對應(yīng)的內(nèi)核epoll 實(shí)例添加、修改或刪除對 fd 上事件 event 的監(jiān)聽。op 可以為 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分別對應(yīng)的是添加新的事件,修改文件描述符上監(jiān)聽的事件類型,從實(shí)例上刪除一個(gè)事件。如果 event 的 events 屬性設(shè)置了 EPOLLET flag,那么監(jiān)聽該事件的方式是邊緣觸發(fā)。

events可以是以下幾個(gè)宏的集合:

  • EPOLLIN:觸發(fā)該事件,表示對應(yīng)的文件描述符上有可讀數(shù)據(jù)。(包括對端SOCKET正常關(guān)閉);
  • EPOLLOUT:觸發(fā)該事件,表示對應(yīng)的文件描述符上可以寫數(shù)據(jù);
  • EPOLLPRI:表示對應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀(這里應(yīng)該表示有帶外數(shù)據(jù)到來);
  • EPOLLERR:表示對應(yīng)的文件描述符發(fā)生錯(cuò)誤;
  • EPOLLHUP:表示對應(yīng)的文件描述符被掛斷;
  • EPOLLET:將EPOLL設(shè)為邊緣觸發(fā)(Edge Triggered)模式,這是相對于水平觸發(fā)(Level Triggered)來說的。
  • EPOLLONESHOT:只監(jiān)聽一次事件,當(dāng)監(jiān)聽完這次事件之后,如果還需要繼續(xù)監(jiān)聽這個(gè)socket的話,需要再次把這個(gè)socket加入到EPOLL隊(duì)列里。

例如:

struct epoll_event ev; //設(shè)置與要處理的事件相關(guān)的文件描述符 ev.data.fd=listenfd; //設(shè)置要處理的事件類型 ev.events=EPOLLIN|EPOLLET; //注冊epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

③epoll_wait

Linux-2.6.19又引入了可以屏蔽指定信號的epoll_wait: epoll_pwait

接收發(fā)生在被偵聽的描述符上的,用戶感興趣的IO事件。簡單點(diǎn)說:通過循環(huán),不斷地監(jiān)聽暴露的端口,看哪一個(gè)fd可讀、可寫~

當(dāng) timeout 為 0 時(shí),epoll_wait 永遠(yuǎn)會立即返回。而 timeout 為 -1 時(shí),epoll_wait 會一直阻塞直到任一已注冊的事件變?yōu)榫途w。當(dāng) timeout 為一正整數(shù)時(shí),epoll 會阻塞直到計(jì)時(shí)結(jié)束或已注冊的事件變?yōu)榫途w。因?yàn)閮?nèi)核調(diào)度延遲,阻塞的時(shí)間可能會略微超過 timeout (毫秒級)。

epoll文件描述符用完后,直接用close關(guān)閉,并且會自動從被偵聽的文件描述符集合中刪除

epoll實(shí)戰(zhàn)

說了這么多原理,腦殼怕嗡嗡的吧,來看看實(shí)戰(zhàn)清醒下~

如上知道:每次添加/修改/刪除被偵聽文件描述符都需要調(diào)用epoll_ctl,所以要盡量少地調(diào)用epoll_ctl,防止其所引來的開銷抵消其帶來的好處。有的時(shí)候,應(yīng)用中可能存在大量的短連接(比如說Web服務(wù)器),epoll_ctl將被頻繁地調(diào)用,可能成為這個(gè)系統(tǒng)的瓶頸。

傳統(tǒng)的select以及poll的效率會因?yàn)樵诰€人數(shù)的線形遞增而導(dǎo)致呈二次乃至三次方的下降,這些直接導(dǎo)致了網(wǎng)絡(luò)服務(wù)器可以支持的人數(shù)有了個(gè)比較明顯的限制。這是因?yàn)樗麄冇邢薜奈募枋龇捅闅v所有的fd所帶來的低效。

重點(diǎn)哦~

當(dāng)你擁有一個(gè)很大的socket集合,不過由于網(wǎng)絡(luò)延時(shí),任一時(shí)間只有部分的socket是“活躍”的,但是select/poll每次調(diào)用都會線性掃描全部的集合,導(dǎo)致效率呈現(xiàn)線性下降。epoll不存在這個(gè)問題,它只會對“活躍”的socket進(jìn)行操作---這是因?yàn)樵趦?nèi)核實(shí)現(xiàn)中epoll是根據(jù)每個(gè)fd上面的callback函數(shù)實(shí)現(xiàn)的。那么,只有“活躍”的socket才會主動的去調(diào)用 callback函數(shù),其他idle(空閑)狀態(tài)socket則不會,在這點(diǎn)上,epoll實(shí)現(xiàn)了一個(gè)“偽”AIO,因?yàn)檫@時(shí)候推動力在os內(nèi)核。在一些 benchmark中,如果所有的socket基本上都是活躍的---比如一個(gè)高速LAN環(huán)境,epoll并不比select/poll有什么效率,相反,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環(huán)境,epoll的效率就遠(yuǎn)在select/poll之上了。

int epfd = epoll_create(POLL_SIZE);struct epoll_event ev;struct epoll_event *events = NULL;nfds = epoll_wait(epfd, events, 20, 500);{for (n = 0; n < nfds; ++n) {if (events[n].data.fd == listener) {//如果是主socket的事件的話,則表示//有新連接進(jìn)入了,進(jìn)行新連接的處理。client = accept(listener, (structsockaddr *)&local, &addrlen);if (client < 0) {perror("accept");continue;}setnonblocking(client); //將新連接置于非阻塞模式ev.events = EPOLLIN | EPOLLET; //并且將新連接也加入EPOLL的監(jiān)聽隊(duì)列。//注意,這里的參數(shù)EPOLLIN|EPOLLET并沒有設(shè)置對寫socket的監(jiān)聽,//如果有寫操作的話,這個(gè)時(shí)候epoll是不會返回事件的,如果要對寫操作//也監(jiān)聽的話,應(yīng)該是EPOLLIN|EPOLLOUT|EPOLLETev.data.fd = client;if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {//設(shè)置好event之后,將這個(gè)新的event通過epoll_ctl加入到epoll的監(jiān)聽隊(duì)列里面,//這里用EPOLL_CTL_ADD來加一個(gè)新的epoll事件,通過EPOLL_CTL_DEL來減少一個(gè)//epoll事件,通過EPOLL_CTL_MOD來改變一個(gè)事件的監(jiān)聽方式。fprintf(stderr, "epollsetinsertionerror:fd=%d", client);return -1;}}else if(event[n].events & EPOLLIN){//如果是已經(jīng)連接的用戶,并且收到數(shù)據(jù),//那么進(jìn)行讀入int sockfd_r;if ((sockfd_r = event[n].data.fd) < 0)continue;read(sockfd_r, buffer, MAXSIZE);//修改sockfd_r上要處理的事件為EPOLLOUTev.data.fd = sockfd_r;ev.events = EPOLLOUT | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)}else if(event[n].events & EPOLLOUT){//如果有數(shù)據(jù)發(fā)送int sockfd_w = events[n].data.fd;write(sockfd_w, buffer, sizeof(buffer));//修改sockfd_w上要處理的事件為EPOLLINev.data.fd = sockfd_w;ev.events = EPOLLIN | EPOLLET;epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev)}do_use_fd(events[n].data.fd);}}

簡單說下流程:

  • 監(jiān)聽到有新連接進(jìn)入了,進(jìn)行新連接的處理;
  • 如果是已經(jīng)連接的用戶,并且收到數(shù)據(jù),讀完之后修改sockfd_r上要處理的事件為EPOLLOUT(可寫);
  • 如果有數(shù)據(jù)發(fā)送,寫完之后,修改sockfd_w上要處理的事件為EPOLLIN(可讀)

epoll在Java中怎么去調(diào)用的?

基礎(chǔ)知識:

文件描述符:

  • (參考《Unix網(wǎng)絡(luò)編程》譯者的注釋)
  • 文件描述符是Unix系統(tǒng)標(biāo)識文件的int,Unix的哲學(xué)一切皆文件,所以各自資源(包括常規(guī)意義的文件、目錄、管道、POSIX IPC、socket)都可以看成文件。

Java NIO的世界中,Selector是中央控制器,Buffer是承載數(shù)據(jù)的容器,而Channel可以說是最基礎(chǔ)的門面,它是本地I/O設(shè)備、網(wǎng)絡(luò)I/O的通信橋梁。

  • 網(wǎng)絡(luò)I/O設(shè)備:
    • DatagramChannel:讀寫UDP通信的數(shù)據(jù),對應(yīng)DatagramSocket類
    • SocketChannel:讀寫TCP通信的數(shù)據(jù),對應(yīng)Socket類
    • ServerSocketChannel:監(jiān)聽新的TCP連接,并且會創(chuàng)建一個(gè)可讀寫的SocketChannel,對應(yīng)ServerSocket類
  • 本地I/O設(shè)備:
    • FileChannel:讀寫本地文件的數(shù)據(jù),不支持Selector控制,對應(yīng)File類

①先從最簡單的ServerSocketChannel看起

ServerSocketChannel與ServerSocket一樣是socket監(jiān)聽器,其主要區(qū)別前者可以運(yùn)行在非阻塞模式下運(yùn)行;

// 創(chuàng)建一個(gè)ServerSocketChannel,將會關(guān)聯(lián)一個(gè)未綁定的ServerSocketpublic static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}

ServerSocketChannel的創(chuàng)建也是依賴底層操作系統(tǒng)實(shí)現(xiàn),其實(shí)現(xiàn)類主要是ServerSocketChannelImpl,我們來看看其構(gòu)造方法

ServerSocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);// 創(chuàng)建一個(gè)文件操作符this.fd = Net.serverSocket(true);// 得到文件操作符是索引this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}

新建一個(gè)ServerSocketChannelImpl其本質(zhì)是在底層操作系統(tǒng)創(chuàng)建了一個(gè)fd(即文件描述符),相當(dāng)于建立了一個(gè)用于網(wǎng)絡(luò)通信的通道,調(diào)用socket的bind()方法綁定,通過accept()調(diào)用操作系統(tǒng)獲取TCP連接

public SocketChannel accept() throws IOException {// 忽略一些校驗(yàn)及無關(guān)代碼.... ?SocketChannelImpl var2 = null;// var3的作用主要是說明當(dāng)前的IO狀態(tài),主要有/*** EOF = -1;* UNAVAILABLE = -2;* INTERRUPTED = -3;* UNSUPPORTED = -4;* THROWN = -5;* UNSUPPORTED_CASE = -6;*/int var3 = 0;// 這里本質(zhì)也是用fd來獲取連接FileDescriptor var4 = new FileDescriptor();// 用來存儲TCP連接的地址信息InetSocketAddress[] var5 = new InetSocketAddress[1]; ?try {// 這里設(shè)置了一個(gè)中斷器,中斷時(shí)會將連接關(guān)閉this.begin();// 這里當(dāng)IO被中斷時(shí),會重新獲取連接do {var3 = this.accept(this.fd, var4, var5);} while(var3 == -3 && this.isOpen());}finally {// 當(dāng)連接被關(guān)閉且accept失敗時(shí)或拋出AsynchronousCloseExceptionthis.end(var3 > 0);// 驗(yàn)證連接是可用的assert IOStatus.check(var3);} ?if (var3 < 1) {return null;} {// 默認(rèn)連接是阻塞的IOUtil.configureBlocking(var4, true);// 創(chuàng)建一個(gè)SocketChannel的引用var2 = new SocketChannelImpl(this.provider(), var4, var5[0]);// 下面是是否連接成功校驗(yàn),這里忽略... ?return var2;} } ? // 依賴底層操作系統(tǒng)實(shí)現(xiàn)的accept0方法 private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException {return this.accept0(var1, var2, var3); }

②SocketChannel

用于讀寫TCP通信的數(shù)據(jù),相當(dāng)于客戶端

  • 通過open方法創(chuàng)建SocketChannel,
  • 然后利用connect方法來和服務(wù)端發(fā)起建立連接,還支持了一些判斷連接建立情況的方法;
  • read和write支持最基本的讀寫操作
  • open

    public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);}// State, increases monotonicallyprivate static final int ST_UNINITIALIZED = -1;private static final int ST_UNCONNECTED = 0;private static final int ST_PENDING = 1;private static final int ST_CONNECTED = 2;private static final int ST_KILLPENDING = 3;private static final int ST_KILLED = 4;private int state = ST_UNINITIALIZED; SocketChannelImpl(SelectorProvider sp) throws IOException {super(sp);// 創(chuàng)建一個(gè)scoket通道,即fd(fd的作用可參考上面的描述)this.fd = Net.socket(true);// 得到該fd的索引this.fdVal = IOUtil.fdVal(fd);// 設(shè)置為未連接this.state = ST_UNCONNECTED;}

    connect建立連接

    // 代碼均來自JDK1.8 部分代碼public boolean connect(SocketAddress var1) throws IOException {boolean var2 = false;// 讀寫都鎖住synchronized(this.readLock) {synchronized(this.writeLock) {/****狀態(tài)檢查,channel和address****/// 判斷channel是否openthis.ensureOpenAndUnconnected();InetSocketAddress var5 = Net.checkAddress(var1);SecurityManager var6 = System.getSecurityManager();if (var6 != null) {var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort());} ?boolean var10000;/****連接建立****/// 阻塞狀態(tài)變更的鎖也鎖住synchronized(this.blockingLock()) {int var8 = 0; ?try {try {this.begin(); // 如果當(dāng)前socket未綁定本地端口,則嘗試著判斷和服務(wù)端是否能建立連接synchronized(this.stateLock) {if (!this.isOpen()) {boolean var10 = false;return var10;} ?if (this.localAddress == null) {// 和遠(yuǎn)程建立連接后關(guān)閉連接NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort());} ?this.readerThread = NativeThread.current();} ?do {InetAddress var9 = var5.getAddress();if (var9.isAnyLocalAddress()) {var9 = InetAddress.getLocalHost();}// 建立連接var8 = Net.connect(this.fd, var9, var5.getPort());} while(var8 == -3 && this.isOpen());synchronized(this.stateLock) {this.remoteAddress = var5;if (var8 <= 0) {if (!this.isBlocking()) {this.state = 1;} else {assert false;}} else {this.state = 2;// 連接成功if (this.isOpen()) {this.localAddress = Net.localAddress(this.fd);} ?var10000 = true;return var10000;}}} ?var10000 = false;return var10000;}}}

    在建立在綁定地址之前,我們需要調(diào)用NetHooks.beforeTcpBind,這個(gè)方法是將fd轉(zhuǎn)換為SDP(Sockets Direct Protocol,Java套接字直接協(xié)議) socket。SDP需要網(wǎng)卡支持InfiniBand高速網(wǎng)絡(luò)通信技術(shù),windows不支持該協(xié)議。

    我們來看看在openjdk: srcsolarisclassessunnet下的NetHooks.java

    private static final Provider provider = new sun.net.sdp.SdpProvider();public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpBind(fdObj, address, port);}public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException{provider.implBeforeTcpConnect(fdObj, address, port);}

    可以看到實(shí)際是調(diào)用的SdpProvider里的implBeforeTcpBind

    @Overridepublic void implBeforeTcpBind(FileDescriptor fdObj,InetAddress address,int port)throws IOException{if (enabled)convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);}// converts unbound TCP socket to a SDP socket if it matches the rulesprivate void convertTcpToSdpIfMatch(FileDescriptor fdObj,Action action,InetAddress address,int port)throws IOException{boolean matched = false;// 主要是先通過規(guī)則校驗(yàn)器判斷入?yún)⑹欠穹?#xff0c;一般有PortRangeRule校驗(yàn)器// 然后再執(zhí)行將fd轉(zhuǎn)換為socketfor (Rule rule: rules) {if (rule.match(action, address, port)) {SdpSupport.convertSocket(fdObj);matched = true;break;}}}public static void convertSocket(FileDescriptor fd) throws IOException {...//獲取fd索引int fdVal = fdAccess.get(fd);convert0(fdVal);}// convert0JNIEXPORT void JNICALLJava_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd){// create方法實(shí)際是通過socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一個(gè)socketint s = create(env);if (s >= 0) {socklen_t len;int arg, res;struct linger linger;/* copy socket options that are relevant to SDP */len = sizeof(arg);// 重用TIME_WAIT的端口if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len);len = sizeof(arg);// 緊急數(shù)據(jù)放入普通數(shù)據(jù)流if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0)setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len);len = sizeof(linger);// 延遲關(guān)閉連接if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0)setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len);// 將fd也引用到s所持有的通道RESTARTABLE(dup2(s, fd), res);if (res < 0)JNU_ThrowIOExceptionWithLastError(env, "dup2");// 執(zhí)行close方法,關(guān)閉s這個(gè)引用RESTARTABLE(close(s), res);}}

    read 讀

    public int read(ByteBuffer var1) throws IOException {// 省略一些判斷synchronized(this.readLock) {this.begin();synchronized(this.stateLock) {do {// 通過IOUtil的讀取fd的數(shù)據(jù)至buf// 這里的nd是SocketDispatcher,用于調(diào)用底層的read和write操作var3 = IOUtil.read(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen());// 這個(gè)方法主要是將UNAVAILABLE(原為-2)這個(gè)狀態(tài)返回0,否則返回nvar4 = IOStatus.normalize(var3);var20 = false;break label367;} ?this.readerCleanup();assert IOStatus.check(var3);} }}} static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1.isReadOnly()) {throw new IllegalArgumentException("Read-only buffer");} else if (var1 instanceof DirectBuffer) {return readIntoNativeBuffer(var0, var1, var2, var4);} else {// 臨時(shí)緩沖區(qū),大小為buf的remain(limit - position),堆外內(nèi)存,使用ByteBuffer.allocateDirect(size)分配// Notes:這里分配后后面有個(gè)try-finally塊會釋放該部分內(nèi)存ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); ?int var7;try {// 將網(wǎng)絡(luò)中的buf讀進(jìn)direct bufferint var6 = readIntoNativeBuffer(var0, var5, var2, var4);var5.flip();// 待讀取if (var6 > 0) {var1.put(var5);// 成功時(shí)寫入} ?var7 = var6;} finally {Util.offerFirstTemporaryDirectBuffer(var5);} ?return var7;}} private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {// 忽略變量initif (var2 != -1L) {// pread方法只有在同步狀態(tài)下才能使用var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2);} else {// 其調(diào)用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7);} ?if (var9 > 0) {var1.position(var5 + var9);} ?return var9;}} // 同樣找到openjdk:srcsolarisnativesunnioch //FileDispatcherImpl.c JNIEXPORT jint JNICALL Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,jobject fdo, jlong address, jint len) {jint fd = fdval(env, fdo);// 獲取fd索引void *buf = (void *)jlong_to_ptr(address);// 調(diào)用底層read方法return convertReturnVal(env, read(fd, buf, len), JNI_TRUE); }

    總結(jié)一下讀取的過程

  • 初始化一個(gè)direct buffer,如果本身的buffer就是direct的則不用初始化
  • 調(diào)用底層read方法寫入至direct buffer
  • 最終將direct buffer寫到傳入的buffer對象
  • write 寫

    看完了前面的read,write整個(gè)執(zhí)行流程基本一樣,具體的細(xì)節(jié)參考如下

    public int write(ByteBuffer var1) throws IOException {if (var1 == null) {throw new NullPointerException();} else {synchronized(this.writeLock) {this.ensureWriteOpen();this.begin();synchronized(this.stateLock) {if (!this.isOpen()) {var5 = 0;var20 = false;break label310;}this.writerThread = NativeThread.current();}do {// 通過IOUtil的讀取fd的數(shù)據(jù)至buf// 這里的nd是SocketDispatcher,用于調(diào)用底層的read和write操作var3 = IOUtil.write(this.fd, var1, -1L, nd);} while(var3 == -3 && this.isOpen()); ?var4 = IOStatus.normalize(var3);var20 = false;this.writerCleanup();assert IOStatus.check(var3);return var4;}}}} static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {if (var1 instanceof DirectBuffer) {return writeFromNativeBuffer(var0, var1, var2, var4);} else { ?ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7); ?int var10;try {// 這里的pos為buf初始的position,意思是將buf重置為最初的狀態(tài);因?yàn)槟壳斑€沒有真實(shí)的寫入到channel中var8.put(var1);var8.flip();var1.position(var5);// 調(diào)用int var9 = writeFromNativeBuffer(var0, var8, var2, var4);if (var9 > 0) {var1.position(var5 + var9);} ?var10 = var9;} finally {Util.offerFirstTemporaryDirectBuffer(var8);} ?return var10;}} IOUtil.writeFromNativeBuffer(fd , buf , position , nd) {// ... 忽略一些獲取buf變量的代碼 int written = 0;if (position != -1) {// pread方法只有在同步狀態(tài)下才能使用written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position);} else {// 其調(diào)用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);}//.... } FileDispatcherImpl.write0 {// 調(diào)用底層的write方法寫入return convertReturnVal(env, write(fd, buf, len), JNI_FALSE); } }

    總結(jié)一下write的過程:

  • 如果buf是direct buffer則直接開始寫入,否則需要初始化一個(gè)direct buffer,大小是buf的remain
  • 將buf的內(nèi)容寫入到direct buffer中,并恢復(fù)buf的position
  • 調(diào)用底層的write方法寫入至channel
  • 更新buf的position,即被direct buffer讀取內(nèi)容后的position
  • 耐心一點(diǎn),馬上就到Epoll了

    理解了前面的一些基礎(chǔ)知識,接下來的部分就會涉及到Java是怎么樣來使用epoll的。

    Selector簡述

    Selector的作用是Java NIO中管理一組多路復(fù)用的SelectableChannel對象,并能夠識別通道是否為諸如讀寫事件做好準(zhǔn)備的組件 --Java doc

    Selector的創(chuàng)建過程如下:

    // 1.創(chuàng)建Selector Selector selector = Selector.open();// 2.將Channel注冊到選擇器中 // ....... new channel的過程 ....//Notes:channel要注冊到Selector上就必須是非阻塞的,所以FileChannel是不可以 //使用Selector的,因?yàn)镕ileChannel是阻塞的 channel.configureBlocking(false);// 第二個(gè)參數(shù)指定了我們對 Channel 的什么類型的事件感興趣 SelectionKey key = channel.register(selector , SelectionKey.OP_READ);// 也可以使用或運(yùn)算|來組合多個(gè)事件,例如 SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);// 不過值得注意的是,一個(gè) Channel 僅僅可以被注冊到一個(gè) Selector 一次, // 如果將 Channel 注冊到 Selector 多次, 那么其實(shí)就是相當(dāng)于更新 SelectionKey //的 interest set.

    ①一個(gè)Channel在Selector注冊其代表的是一個(gè)SelectionKey事件,SelectionKey的類型包括:

    • OP_READ:可讀事件;值為:1<<0
    • OP_WRITE:可寫事件;值為:1<<2
    • OP_CONNECT:客戶端連接服務(wù)端的事件(tcp連接),一般為創(chuàng)建SocketChannel客戶端channel;值為:1<<3
    • OP_ACCEPT:服務(wù)端接收客戶端連接的事件,一般為創(chuàng)建ServerSocketChannel服務(wù)端channel;值為:1<<4

    ②一個(gè)Selector內(nèi)部維護(hù)了三組keys:

  • key set:當(dāng)前channel注冊在Selector上所有的key;可調(diào)用keys()獲取
  • selected-key set:當(dāng)前channel就緒的事件;可調(diào)用selectedKeys()獲取
  • cancelled-key:主動觸發(fā)SelectionKey#cancel()方法會放在該集合,前提條件是該channel沒有被取消注冊;不可通過外部方法調(diào)用
  • ③Selector類中總共包含以下10個(gè)方法:

    • open():創(chuàng)建一個(gè)Selector對象
    • isOpen():是否是open狀態(tài),如果調(diào)用了close()方法則會返回false
    • provider():獲取當(dāng)前Selector的Provider
    • keys():如上文所述,獲取當(dāng)前channel注冊在Selector上所有的key
    • selectedKeys():獲取當(dāng)前channel就緒的事件列表
    • selectNow():獲取當(dāng)前是否有事件就緒,該方法立即返回結(jié)果,不會阻塞;如果返回值>0,則代表存在一個(gè)或多個(gè)
    • select(long timeout):selectNow的阻塞超時(shí)方法,超時(shí)時(shí)間內(nèi),有事件就緒時(shí)才會返回;否則超過時(shí)間也會返回
    • select():selectNow的阻塞方法,直到有事件就緒時(shí)才會返回
    • wakeup():調(diào)用該方法會時(shí),阻塞在select()處的線程會立馬返回;(ps:下面一句劃重點(diǎn))即使當(dāng)前不存在線程阻塞在select()處,那么下一個(gè)執(zhí)行select()方法的線程也會立即返回結(jié)果,相當(dāng)于執(zhí)行了一次selectNow()方法
    • close(): 用完Selector后調(diào)用其close()方法會關(guān)閉該Selector,且使注冊到該Selector上的所有SelectionKey實(shí)例無效。channel本身并不會關(guān)閉。

    關(guān)于SelectionKey

    談到Selector就不得不提SelectionKey,兩者是緊密關(guān)聯(lián),配合使用的;如上文所示,往Channel注冊Selector會返回一個(gè)SelectionKey對象, 這個(gè)對象包含了如下內(nèi)容:

    • interest set,當(dāng)前Channel感興趣的事件集,即在調(diào)用register方法設(shè)置的interes set
    • ready set
    • channel
    • selector
    • attached object,可選的附加對象

    ①interest set 可以通過SelectionKey類中的方法來獲取和設(shè)置interes set

    // 返回當(dāng)前感興趣的事件列表 int interestSet = key.interestOps();// 也可通過interestSet判斷其中包含的事件 boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE; // 可以通過interestOps(int ops)方法修改事件列表 key.interestOps(interestSet | SelectionKey.OP_WRITE);

    ②ready set 當(dāng)前Channel就緒的事件列表

    int readySet = key.readyOps();// 也可通過四個(gè)方法來分別判斷不同事件是否就緒 key.isReadable(); //讀事件是否就緒 key.isWritable(); //寫事件是否就緒 key.isConnectable(); //客戶端連接事件是否就緒 key.isAcceptable(); //服務(wù)端連接事件是否就緒

    ③channel和selector 我們可以通過SelectionKey來獲取當(dāng)前的channel和selector

    // 返回當(dāng)前事件關(guān)聯(lián)的通道,可轉(zhuǎn)換的選項(xiàng)包括:`ServerSocketChannel`和`SocketChannel` Channel channel = key.channel(); ? //返回當(dāng)前事件所關(guān)聯(lián)的Selector對象 Selector selector = key.selector();

    attached object 我們可以在selectionKey中附加一個(gè)對象,或者在注冊時(shí)直接附加:

    key.attach(theObject); Object attachedObj = key.attachment(); // 在注冊時(shí)直接附加 SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

    萬丈高樓平地起,基礎(chǔ)知識差不多了,了解了這些,可以找一些nio demo或者netty demo練練手。接下來講解本節(jié)比較重要的~epoll

    前面多次提到了openjdk,seletor的具體實(shí)現(xiàn)肯定是跟操作系統(tǒng)有關(guān)的,我們一起來看看。

    可以看到Selector的實(shí)現(xiàn)是SelectorImpl, 然后SelectorImpl又將職責(zé)委托給了具體的平臺,比如圖中的linux2.6 EpollSelectorImpl,windows是WindowsSelectorImpl,MacOSX是KQueueSelectorImpl

    根據(jù)前面我們知道,Selector.open()可以得到一個(gè)Selector實(shí)例,怎么實(shí)現(xiàn)的呢?

    // Selector.java public static Selector open() throws IOException {// 首先找到provider,然后再打開Selectorreturn SelectorProvider.provider().openSelector(); }// java.nio.channels.spi.SelectorProviderpublic static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;// 這里就是打開Selector的真正方法provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});} }

    在openjdk中,每個(gè)操作系統(tǒng)都有一個(gè)sun.nio.ch.DefaultSelectorProvider實(shí)現(xiàn),以src**solaris**classessunnioch下的DefaultSelectorProvider為例:

    /*** Returns the default SelectorProvider.*/ public static SelectorProvider create() {// 獲取OS名稱String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));// 根據(jù)名稱來創(chuàng)建不同的Selctorif (osname.equals("SunOS"))return createProvider("sun.nio.ch.DevPollSelectorProvider");if (osname.equals("Linux"))return createProvider("sun.nio.ch.EPollSelectorProvider");return new sun.nio.ch.PollSelectorProvider(); }

    打開src**solaris**classessunnioch下的EPollSelectorProvider.java

    public class EPollSelectorProviderextends SelectorProviderImpl {public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}public Channel inheritedChannel() throws IOException {return InheritedChannel.getChannel();} }

    Linux平臺就得到了最終的Selector實(shí)現(xiàn):src**solaris**classessunnioch下的EPollSelectorImpl.java

    來看看它實(shí)現(xiàn)的構(gòu)造器:

    EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);// makePipe返回管道的2個(gè)文件描述符,編碼在一個(gè)long類型的變量中// 高32位代表讀 低32位代表寫// 使用pipe為了實(shí)現(xiàn)Selector的wakeup邏輯long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;// 新建一個(gè)EPollArrayWrapperpollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}

    srcsolarisnativesunnioch下的EPollArrayWrapper.c

    JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) {/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*/int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd; }

    ①epoll_create在前面已經(jīng)講過了,這里就不再贅述了。

    ②epoll wait 等待內(nèi)核IO事件

    調(diào)用Selector.select(返回鍵的數(shù)量,可能是零)最后會委托給各個(gè)實(shí)現(xiàn)的doSelect方法,限于篇幅不貼出太詳細(xì)的,這里看下EpollSelectorImpl的doSelect方法

    protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); //EPollArrayWrapper pollWrapper pollWrapper.poll(timeout);//重點(diǎn)在這里 } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys();// 后面會講到 if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } int poll(long timeout) throws IOException {updateRegistrations();// 這個(gè)代碼在下面講,涉及到epoo_ctl// 這個(gè)epollWait是不是有點(diǎn)熟悉呢?updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;

    看下EPollArrayWrapper.c

    JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd) {struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) { /* Indefinite or no wait *///系統(tǒng)調(diào)用等待內(nèi)核事件RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else { /* Bounded wait; bounded restarts */res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res; }

    可以看到在linux中Selector.select()其實(shí)是調(diào)用了epoll_wait

    ③epoll control以及openjdk對事件管理的封裝

    JDK中對于注冊到Selector上的IO事件關(guān)系是使用SelectionKey來表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.

    調(diào)用Selector.register()時(shí)均會將事件存儲到EpollArrayWrapper.java的成員變量eventsLow和eventsHigh中

    // events for file descriptors with registration changes pending, indexed // by file descriptor and stored as bytes for efficiency reasons. For // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at // least) then the update is stored in a map. // 使用數(shù)組保存事件變更, 數(shù)組的最大長度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024 private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; // 超過數(shù)組長度的事件會緩存到這個(gè)map中,等待下次處理 private Map<Integer,Byte> eventsHigh; ? ? /*** Sets the pending update events for the given file descriptor. This* method has no effect if the update events is already set to KILLED,* unless {@code force} is {@code true}.*/ private void setUpdateEvents(int fd, byte events, boolean force) {// 判斷fd和數(shù)組長度if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}} }/*** Returns the pending update events for the given file descriptor.*/private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}

    在上面poll代碼中涉及到

    int poll(long timeout) throws IOException {updateRegistrations();/ ?/*** Update the pending registrations.*/private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];// 從保存的eventsLow和eventsHigh里取出事件short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0; ?if (events != KILLED) {if (isRegistered) {// 判斷操作類型以傳給epoll_ctl// 沒有指定EPOLLET事件類型opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {// 熟悉的epoll_ctlepollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}private native void epollCtl(int epfd, int opcode, int fd, int events);

    可以看到epollCtl調(diào)用的native方法,我們進(jìn)入EpollArrayWrapper.c

    JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events) {struct epoll_event event;int res;event.events = events;event.data.fd = fd;// epoll_ctl這里就不用多說了吧RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");} }

    在doSelect方法poll執(zhí)行后,會更新EpollSelectorImpl.java里的 updateSelectedKeys,就是Selector里的三個(gè)set集合,具體可看前面。

    /** ? *更新已被epoll選擇fd的鍵。 ? *將就緒興趣集添加到就緒隊(duì)列。 ? */ private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;}

    總結(jié)

    通過本文,你應(yīng)該知道Channel、Selector基本原理和在Java中怎么使用Epoll的。 (包括更細(xì)節(jié)的fd與channel和socket之間的轉(zhuǎn)換關(guān)系)掌握這些基礎(chǔ)知識,再去看NIO、netty網(wǎng)絡(luò)框架的源碼可能就沒有那么吃力了。在接下來的文章里我會跟進(jìn)關(guān)于Netty的文章,畢竟這已成為分布式網(wǎng)絡(luò)通信框架的主流了!

    感謝

    https://zh.wikipedia.org/wiki/Epoll 維基百科

    https://baike.baidu.com/item/epoll/10738144?fr=aladdin

    https://juejin.im/entry/5b51546df265da0f70070b93

    https://www.jianshu.com/p/f26f1eaa7c8e

    來自:微信公眾號(作者:汀雨筆記),著作權(quán)屬于:本文和汀雨

    總結(jié)

    以上是生活随笔為你收集整理的epoll监听文件_【原创】万字长文浅析:Epoll与Java Nio的那些事儿的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。