I/O多路复用与socket
前言
簡單來講I/O多路復用就是用一個進程來監(jiān)聽多個文件描述符(fd),我們將監(jiān)聽的fd通過系統(tǒng)調(diào)用注冊到內(nèi)核中,如果有一個或多個fd可讀或可寫,內(nèi)核會通知應用程序來對這些fd做讀寫操作,select、poll、epoll都是用于處理此類問題的系統(tǒng)API,只不過注冊和調(diào)用的方式略有不同。
例如telnet命令的操作,telnet命令從shell讀入數(shù)據(jù)然后寫到socket fd上,同時也需要從socket fd上讀數(shù)據(jù)寫到shell上。telnet server需要從socket讀出命令并發(fā)送給shell,再將命令執(zhí)行結果返回給telnet客戶端。此時對于telnet命令來說,需要接收用戶輸入和sockfd的輸入,也需要輸出給用戶和socket fd,這兩種輸入和輸出是無序的,不能單純的阻塞某一個讀操作,如何處理這種場景?
- 將兩個read fd設置為非阻塞,然后輪詢兩個read fd,如果第一個收到數(shù)據(jù),則處理,之后再看第二個read fd是否有數(shù)據(jù)需要讀取,如此往復。
- 使用多進程或者多線程,將用戶輸入和輸出到sockfd作為一條通道。將sockfd輸入和輸出給用戶作為一條通道。
這樣父進程讀入用戶數(shù)據(jù)后會發(fā)送給socketfd到telenetd,子進程讀入telnetd數(shù)據(jù)后發(fā)送給用。當用戶終止父進程時,需要發(fā)送信號給子進程。當子進I/O結束終止時,父進程也需要接收子進程的結束信號。使用多線程同樣需要一些復雜的線程間同步操作。
- 異步I/O的方式,對兩個read fd使用不同的信號,使用不同的處理函數(shù)處理。
以上三種方法在讀寫連接少的時候沒什么問題,當一個server進程需要維護成千上萬條通信連接時就會出問題。第1種會無端浪費cpu,第2種就算使用線程\進程池來避免上下文切換的開銷,當連接數(shù)量過多的時候,會占用大量的內(nèi)存,第3種使用異步I/O顯然信號類型肯定是不夠用的。所以為了應對此類問題,有了I/O多路復用的技術。
- 使用select、poll、epoll,將兩個read fd注冊到內(nèi)核,I/O多路復用會阻塞直到有read請求過來,然后返回通知應用,應用針對不同的描述符進行不同的操作。這樣可以做到在一個進程中監(jiān)聽并處理多個描述符,再搭配線程池使用,則可以盡量的減少cpu和內(nèi)存的使用,自然可以維護更多的連接。
select
先看一下select的創(chuàng)建函數(shù)
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
// 監(jiān)聽描述符數(shù)目
// readfds、writefds、exceptfds表示可讀、可寫、異常事件對應的fd
// timeout表示select阻塞多長時間后返回,NULL為一直阻塞、0為立即返回、或指定超時時間
/*
返回值:
0表示超時時間內(nèi)沒有就緒的fds
成功時返回就緒fds總數(shù)(讀、寫、異常)
失敗返回-1并設置errno,如果select等待期間被信號中斷則立即返回-1并設置errno為EINTR
*/
-
fd_set是一個字節(jié)數(shù)組,每一位標識一個fd。所以通常nfds設置為最大的fd的值+1,在sys/selct.h中可以找到/* Number of descriptors that can fit in anfd_set'. */值為#define __FD_SETSIZE 1024,系統(tǒng)默認單個進程打開最大fd數(shù)量ulimit -n`為1024,所以select默認最大只能監(jiān)聽1024個fd。
select通過以下四個宏來對fd_set置位:
void FD_CLR(int fd, fd_set *set); // 清除fd_set中的fd位
int FD_ISSET(int fd, fd_set *set); // 確認fd是否在fd_set中開啟,非0值為開啟,0為關閉
void FD_SET(int fd, fd_set *set); // 開啟fd在fd_set中的位
void FD_ZERO(fd_set *set); // 清除fd_set的所有位
demo
我們可以使用select的read_fds和exception_fds來接收普通數(shù)據(jù)和帶外數(shù)據(jù)
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <iostream>
#define BUFFERSIZE 1024
using namespace std;
int main(int argc, char *argv[]) {
if (argc < 3) {
cout << "usage: " << argv[0] << " ip port" << endl;
return 1;
}
// 設置TCP socket server
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
const char *ip = argv[1];
inet_pton(AF_INET, ip, &server_addr.sin_addr);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
cout << "error in create socket" << endl;
return 1;
}
int ret =
bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
assert(ret != -1);
ret = listen(listenfd, 6);
assert(ret != -1);
// 接收客戶端連接
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int connfd =
accept(listenfd, (struct sockaddr *)&client_addr, &client_addr_len);
if (connfd < 0) {
close(listenfd);
cout << "accept connect error" << endl;
return 1;
}
// 初始化要用到的select fd集
fd_set readfds;
fd_set exceptionfds;
FD_ZERO(&readfds);
FD_ZERO(&exceptionfds);
char buffer[BUFFERSIZE];
while (true) {
// 如果是普通數(shù)據(jù)則觸發(fā)readfds, 如果是oob數(shù)據(jù)觸發(fā)exceptionfds
FD_SET(connfd, &readfds);
FD_SET(connfd, &exceptionfds);
// 注冊select, 不關心寫fds設置為NULL,timeout NULL為阻塞
ret = select(connfd + 1, &readfds, NULL, &exceptionfds, NULL);
if (ret < 0) {
cout << "select error" << endl;
break;
}
memset(buffer, '\0', BUFFERSIZE);
if (FD_ISSET(connfd, &readfds)) {
// 接收普通數(shù)據(jù)
int number = recv(connfd, buffer, BUFFERSIZE - 1, 0);
if (number < 0) {
cout << "recv normal data error" << endl;
break;
} else if (number == 0) {
cout << "connection closed" << endl;
break;
}
cout << "recv normal data " << number << " bytes: " << buffer << endl;
}
memset(buffer, '\0', BUFFERSIZE);
if (FD_ISSET(connfd, &exceptionfds)) {
// 接收帶外數(shù)據(jù)
int number = recv(connfd, buffer, BUFFERSIZE - 1, MSG_OOB);
if (number < 0) {
cout << "recv oob data error" << endl;
break;
} else if (number == 0) {
cout << "connection closed" << endl;
break;
}
cout << "recv oob data " << number << " bytes: " << buffer << endl;
}
}
close(listenfd);
close(connfd);
return 0;
}
客戶端截取部分發(fā)送內(nèi)容
const char *oob_data = "abc";
const char *normal_data = "123";
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, oob_data, strlen(oob_data), MSG_OOB);
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, normal_data, strlen(normal_data), 0);
send(sockfd, normal_data, strlen(normal_data), 0);
運行結果如下,成功的接收到帶外數(shù)據(jù)并處理:
socket與I/O事件觸發(fā)
socket fd可讀事件
- 內(nèi)核接收緩沖區(qū)中字節(jié)數(shù)大于等于SO_RCVLOWAT值(通過
getsockopt和setsockopt獲取設置),socket可讀,recv大于0。對端關閉連接,recv等于0。如果沒有資源這次讀取不成功recv返回小于0,并且錯誤碼為EAGIN或EWOULDBLOCK errno,這種不算是錯誤,或許下次讀取就可以成功。 - socket listenfd有新的連接請求
- socket上有未處理的錯誤,通過getsockopt讀取和清除錯誤
socket fd可寫事件
- 內(nèi)核發(fā)送緩沖區(qū)空間大于等于SO_SNDLOWAT可無阻塞寫,send返回大于0
- 如果該socket fd已經(jīng)關閉,再執(zhí)行寫會觸發(fā)SIGPIPE信號
- connect連接成功或超時失敗
- socket上有未處理的錯誤,通過getsockopt讀取和清除錯誤
socket fd異常事件
- socket上接收到帶外數(shù)據(jù)
poll
poll較select做出了改進,select使用bitmap來監(jiān)視fds,而poll使用pollfd結構的數(shù)組來監(jiān)視fds,突破了fds數(shù)量的限制,通過結構體將fd與events綁定,可以監(jiān)視更多類型的事件
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events 注冊的事件*/
short revents; /* returned events 實際發(fā)生的事件*/
};
常用事件類型
- POLLIN:數(shù)據(jù)可讀
- POLLOUT:數(shù)據(jù)可寫
- POLLRDHUP:TCP連接被對端關閉,或者對端關閉了寫操作
- POLLERR:poll發(fā)生錯誤
- POLLHUP:管道寫端關閉,讀端fd收到POLLHUP事件
- POLLINVAL:fd沒有打開
poll的創(chuàng)建函數(shù)
int poll(struct pollfd *fds, nfds_t nfds, int timeout)
// fds 是pollfd結構類型的數(shù)組
// nfds 指定fds的大小
// timeout 超時時間,-1阻塞,0立即返回
/*
返回值:
0表示超時時間內(nèi)沒有就緒的fds
成功時返回就緒fds總數(shù)(讀、寫、異常)
失敗返回-1并設置errno,如果select等待期間被信號中斷則立即返回-1并設置errno為EINTR
*/
demo
監(jiān)聽兩個文件的寫入,輸出到標準輸出
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <cstdio>
#include <cstring>
#include <iostream>
#define BUFFERSIZE 1024
using namespace std;
// 存放pollfd結構數(shù)組
pollfd fds[2];
void setnonblocking(int fd) {
int old_fd_option = fcntl(fd, F_GETFL);
int new_fd_option = O_NONBLOCK | old_fd_option;
fcntl(fd, F_SETFL, new_fd_option);
}
int main(int argc, char *argv[]) {
if (argc < 2) {
cout << "usage: " << argv[0] << "filename1 filename2" << endl;
return 1;
}
// 打開創(chuàng)建好的文件
int fd1 = open(argv[1], O_RDONLY);
int fd2 = open(argv[2], O_RDONLY);
// 設置pollfd結構
fds[0].fd = fd1;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;
fds[1].fd = fd2;
fds[1].events = POLLIN | POLLERR;
fds[1].revents = 0;
// 設置fd為非阻塞,方便看讀取的效果,否則會阻塞在read調(diào)用上
setnonblocking(fd1);
setnonblocking(fd2);
char buffer[BUFFERSIZE];
int number = 0;
while (true) {
// 創(chuàng)建poll
int ret = poll(fds, 2, -1);
if (ret < 0) {
cout << "poll error" << endl;
break;
}
for (int i = 0; i < 2; ++i) {
pollfd fd = fds[i];
if (fd.revents & POLLERR) {
cout << "poll error fd: " << fd.fd << endl;
continue;
// 如果fd可讀
} else if (fd.revents & POLLIN) {
// 每次poll事件清空緩沖區(qū)
bzero(buffer, BUFFERSIZE);
while ((number = read(fd.fd, buffer, BUFFERSIZE)) > 0) {
cout << "read " << number << " bytes from file " << argv[i + 1]
<< " content: " << buffer << endl;
}
}
}
}
close(fd1);
return 0;
}
- 新建文件1.txt和2.txt
- 運行server,另起終端隨機在1.txt和2.txt上使用echo追加寫入內(nèi)容
server端輸出
epoll
epoll與select和poll有很大的差異,epoll將需要監(jiān)視的fd放入內(nèi)核的紅黑樹表中,通過epoll_ctl函數(shù)來添加或刪除該表中需要監(jiān)視的fd,只復制已經(jīng)就緒的fd集合返回給應用。
- 一方面無需像使用select/poll每次調(diào)用都將整個fd集傳遞給它們。
- 另一方面在使用的時候應用遍歷的都是事件就緒的fd。
創(chuàng)建epoll:
int epoll_create(int size);
// size:提示內(nèi)核事件表的大小,不是硬限制
// 返回一個fd,所有其他的函數(shù)都操作該fd
操作事件:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// epfd:epoll_create返回的fd
/* op:
EPOLL_CTL_ADD 添加fd到epfd,事件集合為event
EPOLL_CTL_MOD 修改epfd中的fd事件,事件集合為event
EPOLL_CTL_DEL 從epfd中刪除fd,忽略event參數(shù),一般設為NULL
*/
// 返回值:成功返回0,失敗返回-1設置errno
獲取就緒的事件集
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// epfd:epoll_create返回的fd
// events:就緒的事件數(shù)組,應用遍歷它
// maxevents:指定最大監(jiān)聽的事件數(shù)目
// timeout:超時時間,-1阻塞,0立即返回
// 返回值:成功返回就緒fd的數(shù)目,失敗返回-1設置errno
LT和ET模式
epoll支持兩個模式LT(Level Trigger)和ET(Edge Trigger)
- LT模式可以認為是高效一點的poll,只要fd上有事件發(fā)生就會不斷的喚醒通知,拿讀來說,應用不需要每次都將fd的緩存讀完,epoll會不斷的通知應用來讀取
- ET模式當觸發(fā)事件時,只進行一次喚醒通知,不管此次應用是否將fd緩存讀完,后續(xù)都不會再喚醒,直到新的事件被觸發(fā),這樣大大減少了同一個事件觸發(fā)喚醒的次數(shù),減少了
epoll_wait系統(tǒng)調(diào)用的次數(shù)(上下文切換),所以這種模式也被稱為高效的epoll模式
EPOLLONESHOT事件
我們說ET模式對于一個事件只會觸發(fā)一次,如果是多線程的并發(fā)場景下,當前線程在讀完socket上的數(shù)據(jù)后開始處理這些數(shù)據(jù),在處理期間有新的數(shù)據(jù)到來,此時喚醒新的線程來處理新到來的數(shù)據(jù),出現(xiàn)了兩個線程操作同一fd的情況,可能會出現(xiàn)未知錯誤。EPOLLONESHOT事件可以保證,操作系統(tǒng)對該fd只觸發(fā)一種事件,并且只觸發(fā)一次,這樣任何時刻只能有一個線程操作該fd。這樣也會導致下次該事件無法觸發(fā),所以線程處理完畢后應當使用epoll_ctl重置EPOLLONESHOT。
demo
server的主線程與客戶端建立TCP連接,建立好連接后將連接fd注冊到epoll,如果該鏈接有請求數(shù)據(jù)就啟動新的線程來處理。使用telnet作為客戶端對比不使用EPOLLONESHOT和使用EPOLLONESHOT后server的行為
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <iostream>
using namespace std;
#define MAX_EVENT_NUMBER 1024
#define BUFFERSIZE 1024
static int epollfd = 0;
void setnonblocking(int fd) {
int old_fd_option = fcntl(fd, F_GETFL);
int new_fd_option = old_fd_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_fd_option);
}
void register_epoll(int epollfd, int fd, bool newfd = false,
bool oneshot = false) {
epoll_event events;
events.data.fd = fd;
events.events = EPOLLIN | EPOLLET; // 讀事件、ET工作模式
if (oneshot) {
events.events |= EPOLLONESHOT; // 使用EPOLLONESHOT
}
if (newfd) {
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &events);
} else {
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events);
}
setnonblocking(fd);
}
void *handle_connect(void *arg) {
pid_t tid = gettid();
int connfd = *((int *)arg);
cout << "use thread " << tid << " to handle connect " << connfd << endl;
char buffer[BUFFERSIZE];
memset(buffer, '\0', BUFFERSIZE);
while (true) {
int bytes = recv(connfd, buffer, BUFFERSIZE - 1, 0);
if (bytes == 0) {
cout << "the other peer close connection" << endl;
close(connfd);
break;
} else if (bytes < 0) {
if (errno == EAGAIN) {
cout << connfd << " Temporarily unavailable, read later" << endl;
register_epoll(epollfd, connfd, false,
true); // 重置該連接fd的EPOLLONESHOT
break;
} else {
cout << "read " << connfd << " failure" << endl;
close(connfd);
}
} else {
cout << "thread " << tid << " recve " << bytes
<< " bytes from connection " << connfd << ", content: " << buffer
<< endl;
sleep(10);
}
}
cout << "thread " << tid << " end handle connect " << connfd << endl;
}
int main(int argc, char *argv[]) {
if (argc < 3) {
cout << "usage: " << argv[0] << " ip port" << endl;
return 1;
}
// 創(chuàng)建server端socket
const char *ip = argv[1];
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(atoi(argv[2]));
inet_pton(AF_INET, ip, &serv_addr.sin_addr);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
cout << "create socket error" << endl;
return 1;
}
int ret = bind(listenfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
// epoll_event數(shù)組,用來接收返回的就緒fd
epoll_event events[MAX_EVENT_NUMBER];
// 創(chuàng)建epoll
epollfd = epoll_create(5);
if (epollfd < 0) {
cout << "create epoll error" << endl;
close(listenfd);
return 1;
}
// listenfd 無需使用EPOLLONESHOT
register_epoll(epollfd, listenfd, true, false);
while (true) {
// 等待事件觸發(fā)
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if ((sockfd == listenfd) && (events[i].events & EPOLLIN)) {
// 接收客戶端連接
struct sockaddr cli_addr;
socklen_t cli_addr_len = sizeof(cli_addr);
int connfd =
accept(sockfd, (struct sockaddr *)&cli_addr, &cli_addr_len);
if (connfd < 0) {
cout << "accept connect failure" << endl;
continue;
}
// 新的連接使用EPOLLONESHOT屬性
register_epoll(epollfd, connfd, true, true);
// 新的連接不使用EPOLLONESHOT屬性
// register_epoll(epollfd, connfd, true, false);
} else if (events[i].events & EPOLLIN) {
// 已建立的連接有數(shù)據(jù)請求
pthread_t thread;
// 創(chuàng)建線程處理連接數(shù)據(jù),傳入sockfd參數(shù)以便重置EPOLLONESHOT
pthread_create(&thread, NULL, handle_connect, &sockfd);
} else {
cout << "other errors" << endl;
}
}
}
close(listenfd);
close(epollfd);
return 0;
}
使用EPOLLONESHOT事件:
- telnet1發(fā)送c1 h1, 發(fā)送c1 h2
- telnet2發(fā)送c2 h1
- telnet3發(fā)送c1 h3
server使用線程102108逐個處理 connect5的請求,對于connect6使用線程102109單獨處理
不使用EPOLLONESHOT事件:
修改代碼
注釋掉
// register_epoll(epollfd, connfd, false,
// true); // 重置該連接fd的EPOLLONESHOT
不給connfd使用EPOLLONESHOT
// 新的連接使用EPOLLONESHOT屬性
// register_epoll(epollfd, connfd, true, true);
// 新的連接不使用EPOLLONESHOT屬性
register_epoll(epollfd, connfd, true, false);
編譯運行
- telnet1發(fā)送c1 h1, 發(fā)送c1 h2
- telnet2發(fā)送c2 h1
- telnet3發(fā)送c1 h3
線程102137處理connfd 5,sleep的期間內(nèi),connfd5有新的請求到來,可以看到新起了線程來處理connfd5的新消息
對比總結
select
- 事件集的傳入與使用:select沒有fd與event的綁定結構,只是給可讀、可寫、異常傳遞一個fd集合,不能處理更多的事件類型,將fd_set拷貝到內(nèi)核中,內(nèi)核遍歷fd_set,如果有事件發(fā)生,內(nèi)核對fd_set直接修改,將沒有事件的fd位置空,拷貝到應用,因此每次調(diào)用select都需要重新設置fd_set。應用需要再次完全遍歷fd_set,通過
FD_ISSET判斷事件是否就緒。(兩次fd_set拷貝,兩次fd_set遍歷) - 效率:內(nèi)核處理事件集時間復雜度為O(n),應用索引就緒文件描述符的時間復雜度為O(n)
- 工作模式:LT
- 最大可監(jiān)視fd數(shù):受限于
__FD_SETSIZE 1024宏,可修改該值重新編譯內(nèi)核來增加select可監(jiān)視fd的數(shù)目 - 可移植性:支持windows、linux
poll
- 事件集的傳入與使用:poll將fd與event綁定在pollfd結構中,將pollfd數(shù)組復制到內(nèi)核,觸發(fā)事件時內(nèi)核會修改revents,再將數(shù)組復制回用戶態(tài),因此無需重置需要監(jiān)視的成員。但是用戶使用遍歷的時候仍然需要遍歷整個數(shù)組成員,判斷傳入的events是否與返回的revents相同
- 效率:內(nèi)核處理事件集時間復雜度為O(n),應用索引就緒文件描述符的時間復雜度為O(n)
- 工作模式:LT
- 最大可監(jiān)視fd數(shù):系統(tǒng)支持的最大fd數(shù)目,
/proc/sys/fs/file-max/ - 可移植性:支持windows、linux
epoll
- 事件集的傳入與使用:epoll在內(nèi)核中維護一個紅黑樹結構的事件表,綁定fd與events,這個事件表通過
epoll_create創(chuàng)建,返回一個fd來使用,維護著所有需要監(jiān)視的fd。通過系統(tǒng)調(diào)用epoll_ctl對fd對應的事件進行增、刪、改。應用代碼調(diào)用epoll_wait來獲取已經(jīng)觸發(fā)事件的fd,epoll會將就緒的epoll_event結構的fd放入數(shù)組并拷貝到用戶態(tài),應用直接遍歷該數(shù)組即可拿到每一個觸發(fā)事件的fd - 效率:內(nèi)核處理事件集時間復雜度為O(logn)(操作紅黑樹),應用索引就緒文件描述符的時間復雜度為O(1)
- 工作模式:LT或者ET
- 最大可監(jiān)視fd數(shù):系統(tǒng)支持的最大fd數(shù)目,
/proc/sys/fs/file-max/ - 可移植性:僅支持linux
學習自:
《Linux高性能服務器編程》
《UNIX環(huán)境高級編程》
《UNIX系統(tǒng)編程》
總結
以上是生活随笔為你收集整理的I/O多路复用与socket的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java中的日期计算
- 下一篇: yakit的web fuzzer功能的使