epoll机制
在linux的網(wǎng)絡(luò)編程中,很長的時(shí)間都在使用select來做事件觸發(fā)。在linux新的內(nèi)核中,有了一種替換它的機(jī)制,就是epoll。?
相比于select,epoll最大的好處在于它不會(huì)隨著監(jiān)聽fd數(shù)目的增長而降低效率。因?yàn)樵趦?nèi)核中的select實(shí)現(xiàn)中,它是采用輪詢來處理的,輪詢的fd數(shù)目越多,自然耗時(shí)越多。并且,在linux/posix_types.h頭文件有這樣的聲明:?
#define __FD_SETSIZE??? 1024?
表示select最多同時(shí)監(jiān)聽1024個(gè)fd,當(dāng)然,可以通過修改頭文件再重編譯內(nèi)核來擴(kuò)大這個(gè)數(shù)目,但這似乎并不治本。
epoll的接口非常簡單,一共就三個(gè)函數(shù):?
1. int epoll_create(int size);?
創(chuàng)建一個(gè)epoll的句柄,size用來告訴內(nèi)核這個(gè)監(jiān)聽的數(shù)目一共有多大。這個(gè)參數(shù)不同于select()中的第一個(gè)參數(shù),給出最大監(jiān)聽的fd+1的值。需要注意的是,當(dāng)創(chuàng)建好epoll句柄后,它就是會(huì)占用一個(gè)fd值,在linux下如果查看/proc/進(jìn)程id/fd/,是能夠看到這個(gè)fd的,所以在使用完epoll后,必須調(diào)用close()關(guān)閉,否則可能導(dǎo)致fd被耗盡。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);?
epoll的事件注冊(cè)函數(shù),它不同與select()是在監(jiān)聽事件時(shí)告訴內(nèi)核要監(jiān)聽什么類型的事件,而是在這里先注冊(cè)要監(jiān)聽的事件類型。第一個(gè)參數(shù)是epoll_create()的返回值,第二個(gè)參數(shù)表示動(dòng)作,用三個(gè)宏來表示:?
EPOLL_CTL_ADD:注冊(cè)新的fd到epfd中;?
EPOLL_CTL_MOD:修改已經(jīng)注冊(cè)的fd的監(jiān)聽事件;?
EPOLL_CTL_DEL:從epfd中刪除一個(gè)fd;?
第三個(gè)參數(shù)是需要監(jiān)聽的fd,第四個(gè)參數(shù)是告訴內(nèi)核需要監(jiān)聽什么事,struct epoll_event結(jié)構(gòu)如下:?
struct epoll_event {?
__uint32_t events; /* Epoll events */?
epoll_data_t data; /* User data variable */?
};
events可以是以下幾個(gè)宏的集合:?
EPOLLIN :表示對(duì)應(yīng)的文件描述符可以讀(包括對(duì)端SOCKET正常關(guān)閉);?
EPOLLOUT:表示對(duì)應(yīng)的文件描述符可以寫;?
EPOLLPRI:表示對(duì)應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀(這里應(yīng)該表示有帶外數(shù)據(jù)到來);?
EPOLLERR:表示對(duì)應(yīng)的文件描述符發(fā)生錯(cuò)誤;?
EPOLLHUP:表示對(duì)應(yīng)的文件描述符被掛斷;?
EPOLLET: 將EPOLL設(shè)為邊緣觸發(fā)(Edge Triggered)模式,這是相對(duì)于水平觸發(fā)(Level Triggered)來說的。?
EPOLLONESHOT:只監(jiān)聽一次事件,當(dāng)監(jiān)聽完這次事件之后,如果還需要繼續(xù)監(jiān)聽這個(gè)socket的話,需要再次把這個(gè)socket加入到EPOLL隊(duì)列里
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);?
等待事件的產(chǎn)生,類似于select()調(diào)用。參數(shù)events用來從內(nèi)核得到事件的集合,maxevents告之內(nèi)核這個(gè)events有多大,這個(gè)maxevents的值不能大于創(chuàng)建epoll_create()時(shí)的size,參數(shù)timeout是超時(shí)時(shí)間(毫秒,0會(huì)立即返回,-1將不確定,也有說法說是永久阻塞)。該函數(shù)返回需要處理的事件數(shù)目,如返回0表示已超時(shí)。
--------------------------------------------------------------------------------------------
從man手冊(cè)中,得到ET和LT的具體描述如下
EPOLL事件有兩種模型:?
Edge Triggered (ET)?
Level Triggered (LT)
假如有這樣一個(gè)例子:?
1. 我們已經(jīng)把一個(gè)用來從管道中讀取數(shù)據(jù)的文件句柄(RFD)添加到epoll描述符?
2. 這個(gè)時(shí)候從管道的另一端被寫入了2KB的數(shù)據(jù)?
3. 調(diào)用epoll_wait(2),并且它會(huì)返回RFD,說明它已經(jīng)準(zhǔn)備好讀取操作?
4. 然后我們讀取了1KB的數(shù)據(jù)?
5. 調(diào)用epoll_wait(2)......
Edge Triggered 工作模式:?
如果我們?cè)诘?步將RFD添加到epoll描述符的時(shí)候使用了EPOLLET標(biāo)志,那么在第5步調(diào)用epoll_wait(2)之后將有可能會(huì)掛起,因?yàn)槭S嗟臄?shù)據(jù)還存在于文件的輸入緩沖區(qū)內(nèi),而且數(shù)據(jù)發(fā)出端還在等待一個(gè)針對(duì)已經(jīng)發(fā)出數(shù)據(jù)的反饋信息。只有在監(jiān)視的文件句柄上發(fā)生了某個(gè)事件的時(shí)候 ET 工作模式才會(huì)匯報(bào)事件。因此在第5步的時(shí)候,調(diào)用者可能會(huì)放棄等待仍在存在于文件輸入緩沖區(qū)內(nèi)的剩余數(shù)據(jù)。在上面的例子中,會(huì)有一個(gè)事件產(chǎn)生在RFD句柄上,因?yàn)樵诘?步執(zhí)行了一個(gè)寫操作,然后,事件將會(huì)在第3步被銷毀。因?yàn)榈?步的讀取操作沒有讀空文件輸入緩沖區(qū)內(nèi)的數(shù)據(jù),因此我們?cè)诘?步調(diào)用 epoll_wait(2)完成后,是否掛起是不確定的。epoll工作在ET模式的時(shí)候,必須使用非阻塞套接口,以避免由于一個(gè)文件句柄的阻塞讀/阻塞寫操作把處理多個(gè)文件描述符的任務(wù)餓死。最好以下面的方式調(diào)用ET模式的epoll接口,在后面會(huì)介紹避免可能的缺陷。?
?? i??? 基于非阻塞文件句柄?
?? ii?? 只有當(dāng)read(2)或者write(2)返回EAGAIN時(shí)才需要掛起,等待。但這并不是說每次read()時(shí)都需要循環(huán)讀,直到讀到產(chǎn)生一個(gè)EAGAIN才認(rèn)為此次事件處理完成,當(dāng)read()返回的讀到的數(shù)據(jù)長度小于請(qǐng)求的數(shù)據(jù)長度時(shí),就可以確定此時(shí)緩沖中已沒有數(shù)據(jù)了,也就可以認(rèn)為此事讀事件已處理完成。
Level Triggered 工作模式?
相反的,以LT方式調(diào)用epoll接口的時(shí)候,它就相當(dāng)于一個(gè)速度比較快的poll(2),并且無論后面的數(shù)據(jù)是否被使用,因此他們具有同樣的職能。因?yàn)榧词故褂肊T模式的epoll,在收到多個(gè)chunk的數(shù)據(jù)的時(shí)候仍然會(huì)產(chǎn)生多個(gè)事件。調(diào)用者可以設(shè)定EPOLLONESHOT標(biāo)志,在 epoll_wait(2)收到事件后epoll會(huì)與事件關(guān)聯(lián)的文件句柄從epoll描述符中禁止掉。因此當(dāng)EPOLLONESHOT設(shè)定后,使用帶有 EPOLL_CTL_MOD標(biāo)志的epoll_ctl(2)處理文件句柄就成為調(diào)用者必須作的事情。
然后詳細(xì)解釋ET, LT:
LT(level triggered)是缺省的工作方式,并且同時(shí)支持block和no-block socket.在這種做法中,內(nèi)核告訴你一個(gè)文件描述符是否就緒了,然后你可以對(duì)這個(gè)就緒的fd進(jìn)行IO操作。如果你不作任何操作,內(nèi)核還是會(huì)繼續(xù)通知你的,所以,這種模式編程出錯(cuò)誤可能性要小一點(diǎn)。傳統(tǒng)的select/poll都是這種模型的代表.
ET(edge-triggered)是高速工作方式,只支持no-block socket。在這種模式下,當(dāng)描述符從未就緒變?yōu)榫途w時(shí),內(nèi)核通過epoll告訴你。然后它會(huì)假設(shè)你知道文件描述符已經(jīng)就緒,并且不會(huì)再為那個(gè)文件描述符發(fā)送更多的就緒通知,直到你做了某些操作導(dǎo)致那個(gè)文件描述符不再為就緒狀態(tài)了(比如,你在發(fā)送,接收或者接收請(qǐng)求,或者發(fā)送接收的數(shù)據(jù)少于一定量時(shí)導(dǎo)致了一個(gè)EWOULDBLOCK 錯(cuò)誤)。但是請(qǐng)注意,如果一直不對(duì)這個(gè)fd作IO操作(從而導(dǎo)致它再次變成未就緒),內(nèi)核不會(huì)發(fā)送更多的通知(only once),不過在TCP協(xié)議中,ET模式的加速效用仍需要更多的benchmark確認(rèn)(這句話不理解)。
在許多測試中我們會(huì)看到如果沒有大量的idle -connection或者dead-connection,epoll的效率并不會(huì)比select/poll高很多,但是當(dāng)我們遇到大量的idle- connection(例如WAN環(huán)境中存在大量的慢速連接),就會(huì)發(fā)現(xiàn)epoll的效率大大高于select/poll。(未測試)
?
另外,當(dāng)使用epoll的ET模型來工作時(shí),當(dāng)產(chǎn)生了一個(gè)EPOLLIN事件后,?
讀數(shù)據(jù)的時(shí)候需要考慮的是當(dāng)recv()返回的大小如果等于請(qǐng)求的大小,那么很有可能是緩沖區(qū)還有數(shù)據(jù)未讀完,也意味著該次事件還沒有處理完,所以還需要再次讀取:?
while(rs)?
{?
buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);?
if(buflen < 0)?
{?
??? // 由于是非阻塞的模式,所以當(dāng)errno為EAGAIN時(shí),表示當(dāng)前緩沖區(qū)已無數(shù)據(jù)可讀?
??? // 在這里就當(dāng)作是該次事件已處理處.?
??? if(errno == EAGAIN)?
???? break;?
??? else?
???? return;?
?? }?
?? else if(buflen == 0)?
?? {?
???? // 這里表示對(duì)端的socket已正常關(guān)閉.?
?? }?
?? if(buflen == sizeof(buf)?
???? rs = 1;?? // 需要再次讀取?
?? else?
???? rs = 0;?
}
還有,假如發(fā)送端流量大于接收端的流量(意思是epoll所在的程序讀比轉(zhuǎn)發(fā)的socket要快),由于是非阻塞的socket,那么send()函數(shù)雖然返回,但實(shí)際緩沖區(qū)的數(shù)據(jù)并未真正發(fā)給接收端,這樣不斷的讀和發(fā),當(dāng)緩沖區(qū)滿后會(huì)產(chǎn)生EAGAIN錯(cuò)誤(參考man send),同時(shí),不理會(huì)這次請(qǐng)求發(fā)送的數(shù)據(jù).所以,需要封裝socket_send()的函數(shù)用來處理這種情況,該函數(shù)會(huì)盡量將數(shù)據(jù)寫完再返回,返回-1表示出錯(cuò)。在socket_send()內(nèi)部,當(dāng)寫緩沖已滿(send()返回-1,且errno為EAGAIN),那么會(huì)等待后再重試.這種方式并不很完美,在理論上可能會(huì)長時(shí)間的阻塞在socket_send()內(nèi)部,但暫沒有更好的辦法.
ssize_t socket_send(int sockfd, const char* buffer, size_t buflen)?
{?
ssize_t tmp;?
size_t total = buflen;?
const char *p = buffer;
while(1)?
{?
??? tmp = send(sockfd, p, total, 0);?
??? if(tmp < 0)?
??? {?
????? // 當(dāng)send收到信號(hào)時(shí),可以繼續(xù)寫,但這里返回-1.?
????? if(errno == EINTR)?
??????? return -1;
????? // 當(dāng)socket是非阻塞時(shí),如返回此錯(cuò)誤,表示寫緩沖隊(duì)列已滿,?
????? // 在這里做延時(shí)后再重試.?
????? if(errno == EAGAIN)?
????? {?
??????? usleep(1000);?
??????? continue;?
????? }
????? return -1;?
??? }
??? if((size_t)tmp == total)?
????? return buflen;
??? total -= tmp;?
??? p += tmp;?
}
return tmp;?
}
epoll有兩種模式,Edge Triggered(簡稱ET) 和 Level Triggered(簡稱LT).在采用這兩種模式時(shí)要注意的是,如果采用ET模式,那么僅當(dāng)狀態(tài)發(fā)生變化時(shí)才會(huì)通知,而采用LT模式類似于原來的select/poll操作,只要還有沒有處理的事件就會(huì)一直通知.
以代碼來說明問題:?
首先給出server的代碼,需要說明的是每次accept的連接,加入可讀集的時(shí)候采用的都是ET模式,而且接收緩沖區(qū)是5字節(jié)的,也就是每次只接收5字節(jié)的數(shù)據(jù):
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000
void setnonblocking(int sock)
{
??? int opts;
??? opts=fcntl(sock,F_GETFL);
??? if(opts<0)
??? {
??????? perror("fcntl(sock,GETFL)");
??????? exit(1);
??? }
??? opts = opts|O_NONBLOCK;
??? if(fcntl(sock,F_SETFL,opts)<0)
??? {
??????? perror("fcntl(sock,SETFL,opts)");
??????? exit(1);
??? }???
}
int main()
{
??? int i, maxi, listenfd, connfd, sockfd,epfd,nfds;
??? ssize_t n;
??? char line[MAXLINE];
??? socklen_t clilen;
??? //聲明epoll_event結(jié)構(gòu)體的變量,ev用于注冊(cè)事件,數(shù)組用于回傳要處理的事件
??? struct epoll_event ev,events[20];
??? //生成用于處理accept的epoll專用的文件描述符
??? epfd=epoll_create(256);
??? struct sockaddr_in clientaddr;
??? struct sockaddr_in serveraddr;
??? listenfd = socket(AF_INET, SOCK_STREAM, 0);
??? //把socket設(shè)置為非阻塞方式
??? //setnonblocking(listenfd);
??? //設(shè)置與要處理的事件相關(guān)的文件描述符
??? ev.data.fd=listenfd;
??? //設(shè)置要處理的事件類型
??? ev.events=EPOLLIN|EPOLLET;
??? //ev.events=EPOLLIN;
??? //注冊(cè)epoll事件
??? epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
??? bzero(&serveraddr, sizeof(serveraddr));
??? serveraddr.sin_family = AF_INET;
??? char *local_addr="127.0.0.1";
??? inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT);
??? serveraddr.sin_port=htons(SERV_PORT);
??? bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
??? listen(listenfd, LISTENQ);
??? maxi = 0;
??? for ( ; ; ) {
??????? //等待epoll事件的發(fā)生
??????? nfds=epoll_wait(epfd,events,20,500);
??????? //處理所發(fā)生的所有事件?????
??????? for(i=0;i<nfds;++i)
??????? {
??????????? if(events[i].data.fd==listenfd)
??????????? {
??????????????? connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
??????????????? if(connfd<0){
??????????????????? perror("connfd<0");
??????????????????? exit(1);
??????????????? }
??????????????? //setnonblocking(connfd);
??????????????? char *str = inet_ntoa(clientaddr.sin_addr);
??????????????? cout << "accapt a connection from " << str << endl;
??????????????? //設(shè)置用于讀操作的文件描述符
??????????????? ev.data.fd=connfd;
??????????????? //設(shè)置用于注測的讀操作事件
??????????????? ev.events=EPOLLIN|EPOLLET;
??????????????? //ev.events=EPOLLIN;
??????????????? //注冊(cè)ev
??????????????? epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
??????????? }
??????????? else if(events[i].events&EPOLLIN)
??????????? {
??????????????? cout << "EPOLLIN" << endl;
??????????????? if ( (sockfd = events[i].data.fd) < 0)?
??????????????????? continue;
??????????????? if ( (n = read(sockfd, line, MAXLINE)) < 0) {
??????????????????? if (errno == ECONNRESET) {
??????????????????????? close(sockfd);
??????????????????????? events[i].data.fd = -1;
??????????????????? } else
??????????????????????? std::cout<<"readline error"<<std::endl;
??????????????? } else if (n == 0) {
??????????????????? close(sockfd);
??????????????????? events[i].data.fd = -1;
??????????????? }
??????????????? line[n] = '/0';
??????????????? cout << "read " << line << endl;
??????????????? //設(shè)置用于寫操作的文件描述符
??????????????? ev.data.fd=sockfd;
??????????????? //設(shè)置用于注測的寫操作事件
??????????????? ev.events=EPOLLOUT|EPOLLET;
??????????????? //修改sockfd上要處理的事件為EPOLLOUT
??????????????? //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
??????????? }
??????????? else if(events[i].events&EPOLLOUT)
??????????? {???
??????????????? sockfd = events[i].data.fd;
??????????????? write(sockfd, line, n);
??????????????? //設(shè)置用于讀操作的文件描述符
??????????????? ev.data.fd=sockfd;
??????????????? //設(shè)置用于注測的讀操作事件
??????????????? ev.events=EPOLLIN|EPOLLET;
??????????????? //修改sockfd上要處理的事件為EPOLIN
??????????????? epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
??????????? }
??????? }
??? }
??? return 0;
}
下面給出測試所用的Perl寫的client端,在client中發(fā)送10字節(jié)的數(shù)據(jù),同時(shí)讓client在發(fā)送完數(shù)據(jù)之后進(jìn)入死循環(huán), 也就是在發(fā)送完之后連接的狀態(tài)不發(fā)生改變--既不再發(fā)送數(shù)據(jù), 也不關(guān)閉連接,這樣才能觀察出server的狀態(tài):?
#!/usr/bin/perl
use IO::Socket;
my $host = "127.0.0.1";?
my $port = 5000;
my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";?
my $msg_out = "1234567890";?
print $socket $msg_out;?
print "now send over, go to sleep/n";
while (1)?
{?
??? sleep(1);?
}?
運(yùn)行server和client發(fā)現(xiàn),server僅僅讀取了5字節(jié)的數(shù)據(jù),而client其實(shí)發(fā)送了10字節(jié)的數(shù)據(jù),也就是說,server僅當(dāng)?shù)谝淮伪O(jiān)聽到了EPOLLIN事件,由于沒有讀取完數(shù)據(jù),而且采用的是ET模式,狀態(tài)在此之后不發(fā)生變化,因此server再也接收不到EPOLLIN事件了.
如果我們把client改為這樣:?
#!/usr/bin/perl
use IO::Socket;
my $host = "127.0.0.1";?
my $port = 5000;
my $socket = IO::Socket::INET->new("$host:$port") or die "create socket error $@";?
my $msg_out = "1234567890";?
print $socket $msg_out;?
print "now send over, go to sleep/n";?
sleep(5);?
print "5 second gonesend another line/n";?
print $socket $msg_out;
while (1)?
{?
??? sleep(1);?
}
可以發(fā)現(xiàn),在server接收完5字節(jié)的數(shù)據(jù)之后一直監(jiān)聽不到client的事件,而當(dāng)client休眠5秒之后重新發(fā)送數(shù)據(jù),server再次監(jiān)聽到了變化,只不過因?yàn)橹皇亲x取了5個(gè)字節(jié),仍然有10個(gè)字節(jié)的數(shù)據(jù)(client第二次發(fā)送的數(shù)據(jù))沒有接收完.
如果上面的實(shí)驗(yàn)中,對(duì)accept的socket都采用的是LT模式,那么只要還有數(shù)據(jù)留在buffer中,server就會(huì)繼續(xù)得到通知,讀者可以自行改動(dòng)代碼進(jìn)行實(shí)驗(yàn).
基于這兩個(gè)實(shí)驗(yàn),可以得出這樣的結(jié)論:ET模式僅當(dāng)狀態(tài)發(fā)生變化的時(shí)候才獲得通知,這里所謂的狀態(tài)的變化并不包括緩沖區(qū)中還有未處理的數(shù)據(jù),也就是說,如果要采用ET模式,需要一直read/write直到出錯(cuò)為止,很多人反映為什么采用ET模式只接收了一部分?jǐn)?shù)據(jù)就再也得不到通知了,大多因?yàn)檫@樣;而LT模式是只要有數(shù)據(jù)沒有處理就會(huì)一直通知下去的.
補(bǔ)充說明一下這里一直強(qiáng)調(diào)的"狀態(tài)變化"是什么:
1)對(duì)于監(jiān)聽可讀事件時(shí),如果是socket是監(jiān)聽socket,那么當(dāng)有新的主動(dòng)連接到來為狀態(tài)發(fā)生變化;對(duì)一般的socket而言,協(xié)議棧中相應(yīng)的緩 沖區(qū)有新的數(shù)據(jù)為狀態(tài)發(fā)生變化.但是,如果在一個(gè)時(shí)間同時(shí)接收了N個(gè)連接(N>1),但是監(jiān)聽socket只accept了一個(gè)連接,那么其它未 accept的連接將不會(huì)在ET模式下給監(jiān)聽socket發(fā)出通知,此時(shí)狀態(tài)不發(fā)生變化;對(duì)于一般的socket,就如例子中而言,如果對(duì)應(yīng)的緩沖區(qū)本身 已經(jīng)有了N字節(jié)的數(shù)據(jù),而只取出了小于N字節(jié)的數(shù)據(jù),那么殘存的數(shù)據(jù)不會(huì)造成狀態(tài)發(fā)生變化.
2)對(duì)于監(jiān)聽可寫事件時(shí),同理可推,不再詳述.
而不論是監(jiān)聽可讀還是可寫,對(duì)方關(guān)閉socket連接都將造成狀態(tài)發(fā)生變化,比如在例子中,如果強(qiáng)行中斷client腳本,也就是主動(dòng)中斷了socket連接,那么都將造成server端發(fā)生狀態(tài)的變化,從而server得到通知,將已經(jīng)在本方緩沖區(qū)中的數(shù)據(jù)讀出.
把前面的描述可以總結(jié)如下:僅當(dāng)對(duì)方的動(dòng)作(發(fā)出數(shù)據(jù),關(guān)閉連接等)造成的事件才能導(dǎo)致狀態(tài)發(fā)生變化,而本方協(xié)議棧中已經(jīng)處理的事件(包括接收了對(duì)方的數(shù) 據(jù),接收了對(duì)方的主動(dòng)連接請(qǐng)求)并不是造成狀態(tài)發(fā)生變化的必要條件,狀態(tài)變化一定是對(duì)方造成的.所以在ET模式下的,必須一直處理到出錯(cuò)或者完全處理完 畢,才能進(jìn)行下一個(gè)動(dòng)作,否則可能會(huì)發(fā)生錯(cuò)誤.
另外,從這個(gè)例子中,也可以闡述一些基本的網(wǎng)絡(luò)編程概念.首先,連接的兩端中,一端發(fā)送成功并不代表著對(duì)方上層應(yīng)用程序接收成功, 就拿上面的client測試程序來說,10字節(jié)的數(shù)據(jù)已經(jīng)發(fā)送成功,但是上層的server并沒有調(diào)用read讀取數(shù)據(jù),因此發(fā)送成功僅僅說明了數(shù)據(jù)被對(duì)方的協(xié)議棧接收存放在了相應(yīng)的buffer中,而上層的應(yīng)用程序是否接收了這部分?jǐn)?shù)據(jù)不得而知;同樣的,讀取數(shù)據(jù)時(shí)也只代表著本方協(xié)議棧的對(duì)應(yīng)buffer中有數(shù)據(jù)可讀,而此時(shí)時(shí)候在對(duì)端是否在發(fā)送數(shù)據(jù)也不得而知.
epoll為什么這么快
epoll是多路復(fù)用IO(I/O Multiplexing)中的一種方式,但是僅用于linux2.6以上內(nèi)核,在開始討論這個(gè)問題之前,先來解釋一下為什么需要多路復(fù)用IO.
以一個(gè)生活中的例子來解釋.
假設(shè)你在大學(xué)中讀書,要等待一個(gè)朋友來訪,而這個(gè)朋友只知道你在A號(hào)樓,但是不知道你具體住在哪里,于是你們約好了在A號(hào)樓門口見面.
如果你使用的阻塞IO模型來處理這個(gè)問題,那么你就只能一直守候在A號(hào)樓門口等待朋友的到來,在這段時(shí)間里你不能做別的事情,不難知道,這種方式的效率是低下的.
現(xiàn)在時(shí)代變化了,開始使用多路復(fù)用IO模型來處理這個(gè)問題.你告訴你的朋友來了A號(hào)樓找樓管大媽,讓她告訴你該怎么走.這里的樓管大媽扮演的就是多路復(fù)用IO的角色.
進(jìn)一步解釋select和epoll模型的差異.
select版大媽做的是如下的事情:比如同學(xué)甲的朋友來了,select版大媽比較笨,她帶著朋友挨個(gè)房間進(jìn)行查詢誰是同學(xué)甲,你等的朋友來了,于是在實(shí)際的代碼中,select版大媽做的是以下的事情:
int n = select(&readset,NULL,NULL,100);
for (int i = 0; n > 0; ++i)?
{?
?? if (FD_ISSET(fdarray[i], &readset))?
?? {?
????? do_something(fdarray[i]);?
????? --n;?
?? }?
}
epoll版大媽就比較先進(jìn)了,她記下了同學(xué)甲的信息,比如說他的房間號(hào),那么等同學(xué)甲的朋友到來時(shí),只需要告訴該朋友同學(xué)甲在哪個(gè)房間即可,不用自己親自帶著人滿大樓的找人了.于是epoll版大媽做的事情可以用如下的代碼表示:?
n=epoll_wait(epfd,events,20,500);?
for(i=0;i<n;++i)?
{?
??? do_something(events[n]);?
}
在epoll中,關(guān)鍵的數(shù)據(jù)結(jié)構(gòu)epoll_event定義如下:?
typedef union epoll_data {?
??????????????? void *ptr;?
??????????????? int fd;?
??????????????? __uint32_t u32;?
??????????????? __uint64_t u64;?
??????? } epoll_data_t;
??????? struct epoll_event {?
??????????????? __uint32_t events;????? /* Epoll events */?
??????????????? epoll_data_t data;????? /* User data variable */?
??????? };?
可以看到,epoll_data是一個(gè)union結(jié)構(gòu)體,它就是epoll版大媽用于保存同學(xué)信息的結(jié)構(gòu)體,它可以保存很多類型的信息:fd,指針,等等.有了這個(gè)結(jié)構(gòu)體,epoll大媽可以不用吹灰之力就可以定位到同學(xué)甲.
別小看了這些效率的提高,在一個(gè)大規(guī)模并發(fā)的服務(wù)器中,輪詢IO是最耗時(shí)間的操作之一.再回到那個(gè)例子中,如果每到來一個(gè)朋友樓管大媽都要全樓的查詢同學(xué),那么處理的效率必然就低下了,過不久樓底就有不少的人了.
對(duì)比最早給出的阻塞IO的處理模型, 可以看到采用了多路復(fù)用IO之后, 程序可以自由的進(jìn)行自己除了IO操作之外的工作, 只有到IO狀態(tài)發(fā)生變化的時(shí)候由多路復(fù)用IO進(jìn)行通知, 然后再采取相應(yīng)的操作, 而不用一直阻塞等待IO狀態(tài)發(fā)生變化了.
從上面的分析也可以看出,epoll比select的提高實(shí)際上是一個(gè)用空間換時(shí)間思想的具體應(yīng)用.
多進(jìn)程服務(wù)器中,epoll的創(chuàng)建應(yīng)該在創(chuàng)建子進(jìn)程之后
看我的測試代碼,似乎應(yīng)該是在創(chuàng)建子進(jìn)程之后創(chuàng)建epoll的fd,否則程序?qū)?huì)有問題,試將代碼中兩個(gè)CreateWorker函數(shù)的調(diào)用位置分別調(diào)用,一個(gè)在創(chuàng)建epoll fd之前,一個(gè)在之后,在調(diào)用在創(chuàng)建之前的代碼會(huì)出問題,在我的機(jī)器上(linux內(nèi)核2.6.26)表現(xiàn)的癥狀就是所有進(jìn)程的epoll_wait函數(shù)返回0, 而客戶端似乎被阻塞了:
服務(wù)器端:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
#define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000
typedef struct task_t
{
??? int fd;
??? char buffer[100];
??? int n;
}task_t;
int CreateWorker(int nWorker)
{
??? if (0 < nWorker)
??? {
??????? bool bIsChild;
??????? pid_t nPid;
??????? while (!bIsChild)
??????? {
??????????? if (0 < nWorker)
??????????? {
??????????????? nPid = ::fork();
??????????????? if (nPid > 0)
??????????????? {
??????????????????? bIsChild = false;
??????????????????? --nWorker;
??????????????? }
??????????????? else if (0 == nPid)
??????????????? {
??????????????????? bIsChild = true;
??????????????????? printf("create worker %d success!/n", ::getpid());
??????????????? }
??????????????? else
??????????????? {
??????????????????? printf("fork error: %s/n", ::strerror(errno));
??????????????????? return -1;
??????????????? }
??????????? }
??????????? else?
??????????? {
??????????????? int nStatus;
??????????????? if (-1 == ::wait(&nStatus))
??????????????? {
??????????????????? ++nWorker;
??????????????? }
??????????? }
??????? }
??? }
??? return 0;
}
void setnonblocking(int sock)
{
??? int opts;
??? opts=fcntl(sock,F_GETFL);
??? if(opts<0)
??? {
??????? perror("fcntl(sock,GETFL)");
??????? exit(1);
??? }
??? opts = opts|O_NONBLOCK;
??? if(fcntl(sock,F_SETFL,opts)<0)
??? {
??????? perror("fcntl(sock,SETFL,opts)");
??????? exit(1);
??? }???
}
int main()
{
??? int i, maxi, listenfd, connfd, sockfd,epfd,nfds;
??? ssize_t n;
??? char line[MAXLINE];
??? socklen_t clilen;
??? struct epoll_event ev,events[20];
??? struct sockaddr_in clientaddr;
??? struct sockaddr_in serveraddr;
??? listenfd = socket(AF_INET, SOCK_STREAM, 0);
?????? bzero(&serveraddr, sizeof(serveraddr));
??? serveraddr.sin_family = AF_INET;
??? char *local_addr="127.0.0.1";
??? inet_aton(local_addr,&(serveraddr.sin_addr));//htons(SERV_PORT);
??? serveraddr.sin_port=htons(SERV_PORT);
????? // 地址重用
??? int nOptVal = 1;
??? socklen_t nOptLen = sizeof(int);
??? if (-1 == ::setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &nOptVal, nOptLen))
??? {
??????? return -1;
??? }????
??? setnonblocking(listenfd);
??? bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
??? listen(listenfd, LISTENQ);????
????
??? CreateWorker(5);
????
??? //把socket設(shè)置為非阻塞方式
????
??? //生成用于處理accept的epoll專用的文件描述符
??? epfd=epoll_create(256);????
??? //設(shè)置與要處理的事件相關(guān)的文件描述符
??? ev.data.fd=listenfd;
??? //設(shè)置要處理的事件類型
??? ev.events=EPOLLIN|EPOLLET;
??? //ev.events=EPOLLIN;
??? //注冊(cè)epoll事件
??? epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
?
???? //CreateWorker(5);
?????
??? maxi = 0;
????
??? task_t task;?
??? task_t *ptask;
??? while(true)?
??? {
??????? //等待epoll事件的發(fā)生
??????? nfds=epoll_wait(epfd,events,20,500);
??????? //處理所發(fā)生的所有事件?????
??????? for(i=0;i<nfds;++i)
??????? {
??????????? if(events[i].data.fd==listenfd)
??????????? {????????????????
??????????????? connfd = accept(listenfd,NULL, NULL);
??????????????? if(connfd<0){????????????????????
??????????????????? printf("connfd<0, listenfd = %d/n", listenfd);
??????????????????? printf("error = %s/n", strerror(errno));
??????????????????? exit(1);
??????????????? }
??????????????? setnonblocking(connfd);
???????????????
??????????????? //設(shè)置用于讀操作的文件描述符
??????????????? memset(&task, 0, sizeof(task));
??????????????? task.fd = connfd;
??????????????? ev.data.ptr = &task;
??????????????? //設(shè)置用于注冊(cè)的讀操作事件
??????????????? ev.events=EPOLLIN|EPOLLET;
??????????????? //ev.events=EPOLLIN;
??????????????? //注冊(cè)ev
??????????????? epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
??????????? }
??????????? else if(events[i].events&EPOLLIN)
??????????? {
??????????????? cout << "EPOLLIN" << endl;
??????????????? ptask = (task_t*)events[i].data.ptr;
??????????????? sockfd = ptask->fd;
????????????????
??????????????? if ( (ptask->n = read(sockfd, ptask->buffer, 100)) < 0) {
??????????????????? if (errno == ECONNRESET) {
??????????????????????? close(sockfd);
??????????????????????? events[i].data.ptr = NULL;
??????????????????? } else
??????????????????????? std::cout<<"readline error"<<std::endl;
??????????????? } else if (ptask->n == 0) {
??????????????????? close(sockfd);
??????????????????? events[i].data.ptr = NULL;
??????????????? }
??????????????? ptask->buffer[ptask->n] = '/0';
??????????????? cout << "read " << ptask->buffer << endl;
????????????????
??????????????? //設(shè)置用于寫操作的文件描述符????????????????????????????????
??????????????? ev.data.ptr = ptask;
??????????????? //設(shè)置用于注測的寫操作事件
??????????????? ev.events=EPOLLOUT|EPOLLET;
????????????????????????????????
??????????????? //修改sockfd上要處理的事件為EPOLLOUT
??????????????? epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
??????????? }
??????????? else if(events[i].events&EPOLLOUT)
??????????? {???
??????????????? cout << "EPOLLOUT" << endl;
??????????????? ptask = (task_t*)events[i].data.ptr;
??????????????? sockfd = ptask->fd;
????????????????
??????????????? write(sockfd, ptask->buffer, ptask->n);
????????????????
??????????????? //設(shè)置用于讀操作的文件描述符??????????????
??????????????? ev.data.ptr = ptask;
????????????????
??????????????? //修改sockfd上要處理的事件為EPOLIN
??????????????? epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,&ev);
??????????????? cout << "write " << ptask->buffer;
??????????????? memset(ptask, 0, sizeof(*ptask));
??????????????? close(sockfd);
??????????? }
??????? }
??? }
??? return 0;
}測試客戶端:
#!/usr/bin/perl
use strict;
use Socket;
use IO::Handle;
sub echoclient
{
??? my $host = "127.0.0.1";
??? my $port = 5000;
??? my $protocol = getprotobyname("TCP");
??? $host = inet_aton($host);
??? socket(SOCK, AF_INET, SOCK_STREAM, $protocol) or die "socket() failed: $!";
??? my $dest_addr = sockaddr_in($port, $host);
??? connect(SOCK, $dest_addr) or die "connect() failed: $!";
??? SOCK->autoflush(1);
??? my $msg_out = "hello world/n";
??? print "out = ", $msg_out;
??? print SOCK $msg_out;
??? my $msg_in = <SOCK>;
??? print "in = ", $msg_in;
??? close SOCK;
}
#&echoclient;
#exit(0);
for (my $i = 0; $i < 9999; $i++)
{
??? echoclient;
}
我查看了lighttpd的實(shí)現(xiàn),也是在創(chuàng)建完子進(jìn)程之后才創(chuàng)建的epoll的fd.
請(qǐng)問誰知道哪里有講解這個(gè)的文檔?
假如fd1是由A進(jìn)程加入epfd的,而且用的是ET模式,那么加入通知的是進(jìn)程B,顯然B進(jìn)程不會(huì)對(duì)fd1進(jìn)行處理,所以以后fd1的事件再不會(huì)通知,所以 經(jīng)過幾次循環(huán)之后,所有的fd都沒有事件通知了,所以epoll_wait在timeout之后就返回0了。而在客戶端的結(jié)果可想而知,只能是被阻塞。
也就是說, 這是一種發(fā)生在epoll fd上面的類似于"驚群"的現(xiàn)象.
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀
總結(jié)
- 上一篇: 关于Linux Kernel中的宏定义l
- 下一篇: select机制