通过完整示例来理解如何使用 epoll
網絡服務器通常使用一個獨立的進程或線程來實現每個連接。由于高性能應用程序需要同時處理大量的客戶端,這種方法就不太好用了,因為資源占用和上下文切換時間等因素影響了同時處理大量客戶端的能力。另一種方法是在一個線程中使用非阻塞 I/O,以及一些就緒通知方法,即當你可以在一個套接字上讀寫更多數據的時候告訴你。
本文介紹了 Linux 的 epoll(7) 機制,它是 Linux 最好的就緒通知機制。我們用 C 語言編寫了示例代碼,實現了一個完整的 TCP 服務器。 我假設您有一定 C 語言編程經驗,知道如何在 Linux 上編譯和運行程序,并且可以閱讀手冊查看各種需要的 C 函數。
epoll 是在 Linux 2.6 中引入的,在其他類 UNIX 操作系統上不可用。它提供了一個類似于 select(2) 和 poll(2) 函數的功能:
- select(2) 一次可以監測 FD_SETSIZE數量大小的描述符,FD_SETSIZE 通常是一個在 libc 編譯時指定的小數字。
- poll(2) 一次可以監測的描述符數量并沒有限制,但撇開其它因素,我們每次都不得不檢查就緒通知,線性掃描所有通過描述符,這樣時間復雜度為 O(n)而且很慢。
epoll 沒有這些固定限制,也不執行任何線性掃描。因此它可以更高效地執行和處理大量事件。
一個 epoll 實例可由 epoll_create(2) 或 epoll_create1(2) (它們采用不同的參數)創建,它們的返回值是一個 epoll 實例。epoll_ctl(2) 用來添加或刪除監聽 epoll 實例的描述符。epoll_wait(2) 用來等待被監聽的描述符事件,一直阻塞到事件可用。更多信息請參見相關手冊。
當描述符被添加到 epoll 實例時,有兩種模式:電平觸發和邊緣觸發(譯者注:借鑒電路里面的概念)。當你使用電平觸發模式,并且數據可以被讀取,epoll_wait(2) 函數總是會返回就緒事件。如果你還沒有讀完數據,并且再次在 epoll 實例上調用?epoll_wait(2) 函數監聽這個描述符,由于還有數據可讀,那么它會再次返回這個事件。在邊緣觸發模式下,你只會得到一次就緒通知。如果你沒有將數據全部讀走,并且再次在 epoll 實例上調用 epoll_wait(2) 函數監聽這個描述符,它就會阻塞,因為就緒事件已經發送過了。
傳遞到 epoll_ctl(2) 的 epoll 事件結構體如下。對每一個被監聽的描述符,你可以關聯到一個整數或者一個用戶數據的指針。
C typedef union epoll_data {void *ptr;int fd;__uint32_t u32;__uint64_t u64; } epoll_data_t;struct epoll_event {__uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */ };| 1 2 3 4 5 6 7 8 9 10 11 12 13 | typedef union epoll_data { ??void????????*ptr; ??int??????????fd; ??__uint32_t?? u32; ??__uint64_t?? u64; } epoll_data_t; struct epoll_event { ??__uint32_t?? events; /* Epoll events */ ??epoll_data_t data;?? /* User data variable */ }; |
現在我們開始寫代碼。我們將實現一個小的 TCP 服務器,將發送到這個套接字的所有數據打印到標準輸出上。首先編寫一個 create_and_bind() 函數,用來創建和綁定 TCP 套接字:
C static int create_and_bind (char *port) {struct addrinfo hints;struct addrinfo *result, *rp;int s, sfd;memset (&hints, 0, sizeof (struct addrinfo));hints.ai_family = AF_UNSPEC; /* Return IPv4 and IPv6 choices */hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */hints.ai_flags = AI_PASSIVE; /* All interfaces */s = getaddrinfo (NULL, port, &hints, &result);if (s != 0){fprintf (stderr, "getaddrinfo: %sn", gai_strerror (s));return -1;}for (rp = result; rp != NULL; rp = rp->ai_next){sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);if (sfd == -1)continue;s = bind (sfd, rp->ai_addr, rp->ai_addrlen);if (s == 0){/* We managed to bind successfully! */break;}close (sfd);}if (rp == NULL){fprintf (stderr, "Could not bindn");return -1;}freeaddrinfo (result);return sfd; }| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | static int create_and_bind (char *port) { ??struct addrinfo hints; ??struct addrinfo *result, *rp; ??int s, sfd; ??memset (&hints, 0, sizeof (struct addrinfo)); ??hints.ai_family = AF_UNSPEC;???? /* Return IPv4 and IPv6 choices */ ??hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */ ??hints.ai_flags = AI_PASSIVE;???? /* All interfaces */ ??s = getaddrinfo (NULL, port, &hints, &result); ??if (s != 0) ????{ ??????fprintf (stderr, "getaddrinfo: %sn", gai_strerror (s)); ??????return -1; ????} ??for (rp = result; rp != NULL; rp = rp->ai_next) ????{ ??????sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol); ??????if (sfd == -1) ????????continue; ??????s = bind (sfd, rp->ai_addr, rp->ai_addrlen); ??????if (s == 0) ????????{ ??????????/* We managed to bind successfully! */ ??????????break; ????????} ??????close (sfd); ????} ??if (rp == NULL) ????{ ??????fprintf (stderr, "Could not bindn"); ??????return -1; ????} ??freeaddrinfo (result); ??return sfd; } |
create_and_bind() 包含一個標準代碼塊,用一種可移植的方式來獲得 IPv4 和 IPv6 套接字。它接受一個 port 字符串參數,可由 argv[1] 傳遞。getaddrinfo(3) 函數返回一堆 addrinfo?結構體到 result 變量中,它們與傳入的 hints參數是兼容的。addrinfo結構體像這樣:
C struct addrinfo {int ai_flags;int ai_family;int ai_socktype;int ai_protocol;size_t ai_addrlen;struct sockaddr *ai_addr;char *ai_canonname;struct addrinfo *ai_next; };| 1 2 3 4 5 6 7 8 9 10 11 | struct addrinfo { ??int??????????????ai_flags; ??int??????????????ai_family; ??int??????????????ai_socktype; ??int??????????????ai_protocol; ??size_t?????????? ai_addrlen; ??struct sockaddr *ai_addr; ??char????????????*ai_canonname; ??struct addrinfo *ai_next; }; |
我們依次遍歷這些結構體并用它們創建套接字,直到可以創建并綁定一個套接字。如果成功了,create_and_bind() 返回這個套接字描述符。如果失敗則返回 -1。
下面我們編寫一個函數,用于將套接字設置為非阻塞狀態。make_socket_non_blocking() 為傳入的 sfd?參數設置 O_NONBLOCK 標志:
C static int make_socket_non_blocking (int sfd) {int flags, s;flags = fcntl (sfd, F_GETFL, 0);if (flags == -1){perror ("fcntl");return -1;}flags |= O_NONBLOCK;s = fcntl (sfd, F_SETFL, flags);if (s == -1){perror ("fcntl");return -1;}return 0; }| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | static int make_socket_non_blocking (int sfd) { ??int flags, s; ??flags = fcntl (sfd, F_GETFL, 0); ??if (flags == -1) ????{ ??????perror ("fcntl"); ??????return -1; ????} ??flags |= O_NONBLOCK; ??s = fcntl (sfd, F_SETFL, flags); ??if (s == -1) ????{ ??????perror ("fcntl"); ??????return -1; ????} ??return 0; } |
現在說說?main() 函數吧,它里面包含了這個程序的事件循環。這是主要代碼:
#define MAXEVENTS 64int main (int argc, char *argv[]) {int sfd, s;int efd;struct epoll_event event;struct epoll_event *events;if (argc != 2){fprintf (stderr, "Usage: %s [port]n", argv[0]);exit (EXIT_FAILURE);}sfd = create_and_bind (argv[1]);if (sfd == -1)abort ();s = make_socket_non_blocking (sfd);if (s == -1)abort ();s = listen (sfd, SOMAXCONN);if (s == -1){perror ("listen");abort ();}efd = epoll_create1 (0);if (efd == -1){perror ("epoll_create");abort ();}event.data.fd = sfd;event.events = EPOLLIN | EPOLLET;s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);if (s == -1){perror ("epoll_ctl");abort ();}/* Buffer where events are returned */events = calloc (MAXEVENTS, sizeof event);/* The event loop */while (1){int n, i;n = epoll_wait (efd, events, MAXEVENTS, -1);for (i = 0; i < n; i++){if ((events[i].events & EPOLLERR) ||(events[i].events & EPOLLHUP) ||(!(events[i].events & EPOLLIN))){/* An error has occured on this fd, or the socket is notready for reading (why were we notified then?) */fprintf (stderr, "epoll errorn");close (events[i].data.fd);continue;}else if (sfd == events[i].data.fd){/* We have a notification on the listening socket, whichmeans one or more incoming connections. */while (1){struct sockaddr in_addr;socklen_t in_len;int infd;char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];in_len = sizeof in_addr;infd = accept (sfd, &in_addr, &in_len);if (infd == -1){if ((errno == EAGAIN) ||(errno == EWOULDBLOCK)){/* We have processed all incomingconnections. */break;}else{perror ("accept");break;}}s = getnameinfo (&in_addr, in_len,hbuf, sizeof hbuf,sbuf, sizeof sbuf,NI_NUMERICHOST | NI_NUMERICSERV);if (s == 0){printf("Accepted connection on descriptor %d ""(host=%s, port=%s)n", infd, hbuf, sbuf);}/* Make the incoming socket non-blocking and add it to thelist of fds to monitor. */s = make_socket_non_blocking (infd);if (s == -1)abort ();event.data.fd = infd;event.events = EPOLLIN | EPOLLET;s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);if (s == -1){perror ("epoll_ctl");abort ();}}continue;}else{/* We have data on the fd waiting to be read. Read anddisplay it. We must read whatever data is availablecompletely, as we are running in edge-triggered modeand won't get a notification again for the samedata. */int done = 0;while (1){ssize_t count;char buf[512];count = read (events[i].data.fd, buf, sizeof buf);if (count == -1){/* If errno == EAGAIN, that means we have read alldata. So go back to the main loop. */if (errno != EAGAIN){perror ("read");done = 1;}break;}else if (count == 0){/* End of file. The remote has closed theconnection. */done = 1;break;}/* Write the buffer to standard output */s = write (1, buf, count);if (s == -1){perror ("write");abort ();}}if (done){printf ("Closed connection on descriptor %dn",events[i].data.fd);/* Closing the descriptor will make epoll remove itfrom the set of descriptors which are monitored. */close (events[i].data.fd);}}}}free (events);close (sfd);return EXIT_SUCCESS; }| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | #define MAXEVENTS 64 int main (int argc, char *argv[]) { ??int sfd, s; ??int efd; ??struct epoll_event event; ??struct epoll_event *events; ??if (argc != 2) ????{ ??????fprintf (stderr, "Usage: %s [port]n", argv[0]); ??????exit (EXIT_FAILURE); ????} ??sfd = create_and_bind (argv[1]); ??if (sfd == -1) ????abort (); ??s = make_socket_non_blocking (sfd); ??if (s == -1) ????abort (); ??s = listen (sfd, SOMAXCONN); ??if (s == -1) ????{ ??????perror ("listen"); ??????abort (); ????} ??efd = epoll_create1 (0); ??if (efd == -1) ????{ ??????perror ("epoll_create"); ??????abort (); ????} ??event.data.fd = sfd; ??event.events = EPOLLIN | EPOLLET; ??s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); ??if (s == -1) ????{ ??????perror ("epoll_ctl"); ??????abort (); ????} ??/* Buffer where events are returned */ ??events = calloc (MAXEVENTS, sizeof event); ??/* The event loop */ ??while (1) ????{ ??????int n, i; ??????n = epoll_wait (efd, events, MAXEVENTS, -1); ??????for (i = 0; i < n; i++) ????{ ??????if ((events[i].events & EPOLLERR) || ??????????????(events[i].events & EPOLLHUP) || ??????????????(!(events[i].events & EPOLLIN))) ????????{ ??????????????/* An error has occured on this fd, or the socket is not ???????????????? ready for reading (why were we notified then?) */ ??????????fprintf (stderr, "epoll errorn"); ??????????close (events[i].data.fd); ??????????continue; ????????} ??????else if (sfd == events[i].data.fd) ????????{ ??????????????/* We have a notification on the listening socket, which ???????????????? means one or more incoming connections. */ ??????????????while (1) ????????????????{ ??????????????????struct sockaddr in_addr; ??????????????????socklen_t in_len; ??????????????????int infd; ??????????????????char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; ??????????????????in_len = sizeof in_addr; ??????????????????infd = accept (sfd, &in_addr, &in_len); ??????????????????if (infd == -1) ????????????????????{ ??????????????????????if ((errno == EAGAIN) || ??????????????????????????(errno == EWOULDBLOCK)) ????????????????????????{ ??????????????????????????/* We have processed all incoming ???????????????????????????? connections. */ ??????????????????????????break; ????????????????????????} ??????????????????????else ????????????????????????{ ??????????????????????????perror ("accept"); ??????????????????????????break; ????????????????????????} ????????????????????} ??????????????????s = getnameinfo (&in_addr, in_len, ?????????????????????????????????? hbuf, sizeof hbuf, ?????????????????????????????????? sbuf, sizeof sbuf, ?????????????????????????????????? NI_NUMERICHOST | NI_NUMERICSERV); ??????????????????if (s == 0) ????????????????????{ ??????????????????????printf("Accepted connection on descriptor %d " ???????????????????????????? "(host=%s, port=%s)n", infd, hbuf, sbuf); ????????????????????} ??????????????????/* Make the incoming socket non-blocking and add it to the ???????????????????? list of fds to monitor. */ ??????????????????s = make_socket_non_blocking (infd); ??????????????????if (s == -1) ????????????????????abort (); ??????????????????event.data.fd = infd; ??????????????????event.events = EPOLLIN | EPOLLET; ??????????????????s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event); ??????????????????if (s == -1) ????????????????????{ ??????????????????????perror ("epoll_ctl"); ??????????????????????abort (); ????????????????????} ????????????????} ??????????????continue; ????????????} ??????????else ????????????{ ??????????????/* We have data on the fd waiting to be read. Read and ???????????????? display it. We must read whatever data is available ???????????????? completely, as we are running in edge-triggered mode ???????????????? and won't get a notification again for the same ???????????????? data. */ ??????????????int done = 0; ??????????????while (1) ????????????????{ ??????????????????ssize_t count; ??????????????????char buf[512]; ??????????????????count = read (events[i].data.fd, buf, sizeof buf); ??????????????????if (count == -1) ????????????????????{ ??????????????????????/* If errno == EAGAIN, that means we have read all ???????????????????????? data. So go back to the main loop. */ ??????????????????????if (errno != EAGAIN) ????????????????????????{ ??????????????????????????perror ("read"); ??????????????????????????done = 1; ????????????????????????} ??????????????????????break; ????????????????????} ??????????????????else if (count == 0) ????????????????????{ ??????????????????????/* End of file. The remote has closed the ???????????????????????? connection. */ ??????????????????????done = 1; ??????????????????????break; ????????????????????} ??????????????????/* Write the buffer to standard output */ ??????????????????s = write (1, buf, count); ??????????????????if (s == -1) ????????????????????{ ??????????????????????perror ("write"); ??????????????????????abort (); ????????????????????} ????????????????} ??????????????if (done) ????????????????{ ??????????????????printf ("Closed connection on descriptor %dn", ??????????????????????????events[i].data.fd); ??????????????????/* Closing the descriptor will make epoll remove it ???????????????????? from the set of descriptors which are monitored. */ ??????????????????close (events[i].data.fd); ????????????????} ????????????} ????????} ????} ??free (events); ??close (sfd); ??return EXIT_SUCCESS; } |
main() 首先調用 create_and_bind() 新建套接字。然后把套接字設置非阻塞模式,再調用listen(2)。接下來它創建一個 epoll 實例 efd,添加監聽套接字 sfd ,用電平觸發模式來監聽輸入事件。
外層的 while 循環是主要事件循環。它調用epoll_wait(2),線程保持阻塞以等待事件到來。當事件就緒,epoll_wait(2) 用 events 參數返回事件,這個參數是一群 epoll_event 結構體。
當我們添加新的監聽輸入連接以及刪除終止的現有連接時,efd 這個 epoll 實例在事件循環中不斷更新。
當事件是可用的,它們可以有三種類型:
- 錯誤:當一個錯誤連接出現,或事件不是一個可以讀取數據的通知,我們只要簡單地關閉相關的描述符。關閉描述符會自動地移除 efd 這個 epoll 實例的監聽列表。
- 新連接:當監聽描述符 sfd 是可讀狀態,這表明一個或多個連接已經到達。當有一個新連接, accept(2) 接受這個連接,打印一條相應的消息,把這個到來的套接字設置為非阻塞狀態,并將其添加到 efd 這個?epoll 實例的監聽列表。
- 客戶端數據:當任何一個客戶端描述符的數據可讀時,我們在內部 while 循環中用 read(2) 以 512 字節大小讀取數據。這是因為當前我們必須讀走所有可讀的數據,當監聽描述符是邊緣觸發模式下,我們不會再得到事件。被讀取的數據使用 write(2) 被寫入標準輸出(fd=1)。如果 read(2) 返回 0,這表示 EOF 并且我們可以關閉這個客戶端的連接。如果返回 -1,errno 被設置為 EAGAIN,這表示這個事件的所有數據被讀走,我們可以返回主循環。
就是這樣。它在一個循環中運行,在監聽列表中添加和刪除描述符。
下載 epoll-example.c 代碼。
更新1:水平和邊緣觸發的定義被顛倒錯誤了(雖然代碼是正確的)。這是被Reddit用戶 bodski 發現的。文章現在正確了。我應該在發布前校對的。對不起,并感謝謝指出錯誤。:)
更新2:代碼被修改成連接將被阻塞時才執行accept(2),所以如果多個連接到達,我們全部接受。這是Reddit用戶 pitchford 提出。謝謝你的評論。 :)
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的通过完整示例来理解如何使用 epoll的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IO多路复用之epoll总结
- 下一篇: 使用四种框架分别实现百万websocke