让事件飞——Linux eventfd 原理
讓事件飛——Linux eventfd 原理
讓事件飛 ——Linux eventfd 原理與實踐
原文作者:楊陽
目前越來越多的應(yīng)用程序采用事件驅(qū)動的方式實現(xiàn)功能,如何高效地利用系統(tǒng)資源實現(xiàn)通知的管理和送達(dá)就愈發(fā)變得重要起來。在Linux系統(tǒng)中,eventfd是一個用來通知事件的文件描述符,timerfd是的定時器事件的文件描述符。二者都是內(nèi)核向用戶空間的應(yīng)用發(fā)送通知的機(jī)制,可以有效地被用來實現(xiàn)用戶空間的事件/通知驅(qū)動的應(yīng)用程序。
簡而言之,就是eventfd用來觸發(fā)事件通知,timerfd用來觸發(fā)將來的事件通知。
開發(fā)者使用eventfd相關(guān)的系統(tǒng)調(diào)用,需要包含頭文件;對于timerfd,則是。
系統(tǒng)調(diào)用eventfd/timerfd自linux 2.6.22版本加入內(nèi)核,由Davide Libenzi最初實現(xiàn)和維護(hù)。
eventfd
對于eventfd,只有一個系統(tǒng)調(diào)用接口
1int eventfd(unsigned int initval, int flags);
創(chuàng)建一個eventfd對象,或者說打開一個eventfd的文件,類似普通文件的open操作。
該對象是一個內(nèi)核維護(hù)的無符號的64位整型計數(shù)器。初始化為initval的值。
flags可以以下三個標(biāo)志位的OR結(jié)果:
EFD_CLOEXEC:FD_CLOEXEC,簡單說就是fork子進(jìn)程時不繼承,對于多線程的程序設(shè)上這個值不會有錯的。
EFD_NONBLOCK:文件會被設(shè)置成O_NONBLOCK,一般要設(shè)置。
EFD_SEMAPHORE:(2.6.30以后支持)支持semophore語義的read,簡單說就值遞減1。
這個新建的fd的操作很簡單:
read(): 讀操作就是將counter值置0,如果是semophore就減1。
write(): 設(shè)置counter的值。
注意,還支持epoll/poll/select操作,當(dāng)然,以及每種fd都都必須實現(xiàn)的close。
timerfd
對于timerfd,有三個涉及的系統(tǒng)調(diào)用接口
1int timerfd_create(int clockid, int flags);int timerfd_settime(int fd, int flags,
2 const struct itimerspec *new_value,
3 struct itimerspec *old_value);int timerfd_gettime(int fd, struct itimerspec *curr_value);
timerfd_create就是用來創(chuàng)建新的timerfd對象,clockid可以指定時鐘的種類,比較常用的有兩種:CLOCK_REALTIME(實時時鐘)或 CLOCK_MONOTONIC(單調(diào)遞增時鐘)。實時時鐘是指系統(tǒng)的時鐘,它可以被手工修改。而后者單調(diào)遞增時鐘則是不會被系統(tǒng)時鐘的人為設(shè)置的不連續(xù)所影響的。通常選擇后者。而flags的選擇,TFD_CLOEXEC和TFD_NONBLOCK的意義就比較直接了。
timerfd_settime函數(shù)用來設(shè)置定時器的過期時間expiration。itmerspec結(jié)構(gòu)定義如下:
1struct timespec {
2 time_t tv_sec; /* Seconds /
3 long tv_nsec; / Nanoseconds /};struct itimerspec {
4 struct timespec it_interval; / Interval for periodic timer /
5 struct timespec it_value; / Initial expiration */};
該結(jié)構(gòu)包含兩個時間間隔:it_value是指第一次過期時間,it_interval是指第一次到期之后的周期性觸發(fā)到期的間隔時間,(設(shè)為0的話就是到期第一次)。
old_value如果不為NULL,將會用調(diào)用時間來更新old_value所指的itimerspec結(jié)構(gòu)對象。
timerfd_gettime():返回當(dāng)前timerfd對象的設(shè)置值到curr_value指針?biāo)傅膶ο蟆?/p>
read():讀操作的語義是:如果定時器到期了,返回到期的次數(shù),結(jié)果存在一個8字節(jié)的整數(shù)(uint64_6);如果沒有到期,則阻塞至到期,或返回EAGAIN(取決于是否設(shè)置了NONBLOCK)。
另外,支持epoll,同eventfd。
生產(chǎn)者-消費者設(shè)計模式是常見的后臺架構(gòu)模式。本實例將實現(xiàn)多個生產(chǎn)者和多個消費者的事件通知框架,用以闡釋eventfd/timerfd在線程通信中作為通知實現(xiàn)的典型場景。
本實例采用以下設(shè)計:生產(chǎn)者創(chuàng)建eventfd/timerfd并在事件循環(huán)中注冊事件;消費者線程池中的線程共用一個epoll對象,每個消費者線程并行地進(jìn)行針對eventfd或timerfd觸發(fā)的事件循環(huán)的輪詢(epoll_wait)。
eventfd對應(yīng)實現(xiàn)
1typedef struct thread_info {
2 pthread_t thread_id;
3 int rank;
4 int epfd;} thread_info_t;static void *consumer_routine(void *data) {
5 struct thread_info *c = (struct thread_info *)data;
6 struct epoll_event *events;
7 int epfd = c->epfd;
8 int nfds = -1;
9 int i = -1;
10 uint64_t result;
11
12 log(“Greetings from [consumer-%d]”, c->rank);
13 events = calloc(MAX_EVENTS_SIZE, sizeof(struct epoll_event));
14 if (events == NULL) handle_error(“calloc epoll events\n”);
15
16 for (;😉 {
17 nfds = epoll_wait(epfd, events, MAX_EVENTS_SIZE, 1000); // poll every second
18 for (i = 0; i < nfds; i++) {
19 if (events[i].events & EPOLLIN) {
20 log("[consumer-%d] got event from fd-%d", c->rank, events[i].data.fd);
21 // consume events (reset eventfd)
22 read(events[i].data.fd, &result, sizeof(uint64_t));
23 close(events[i].data.fd); // NOTE: need to close here
24 }
25 }
26 }}static void *producer_routine(void *data) {
27 struct thread_info *p = (struct thread_info *)data;
28 struct epoll_event event;
29 int epfd = p->epfd;
30 int efd = -1;
31 int ret = -1;
32
33 log(“Greetings from [producer-%d]”, p->rank);
34 while (1) {
35 sleep(1);
36 // create eventfd (no reuse, create new every time)
37 efd = eventfd(1, EFD_CLOEXEC|EFD_NONBLOCK);
38 if (efd == -1) handle_error(“eventfd create: %s”, strerror(errno));
39 // register to poller
40 event.data.fd = efd;
41 event.events = EPOLLIN | EPOLLET; // Edge-Triggered
42 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &event);
43 if (ret != 0) handle_error(“epoll_ctl”);
44 // trigger (repeatedly)
45 write(efd, (void *)0xffffffff, sizeof(uint64_t));
46 }}int main(int argc, char *argv[]) {
47 struct thread_info *p_list = NULL, *c_list = NULL;
48 int epfd = -1;
49 int ret = -1, i = -1;
50 // create epoll fd
51 epfd = epoll_create1(EPOLL_CLOEXEC);
52 if (epfd == -1) handle_error(“epoll_create1: %s”, strerror(errno));
53 // producers
54 p_list = calloc(NUM_PRODUCERS, sizeof(struct thread_info));
55 if (!p_list) handle_error(“calloc”);
56 for (i = 0; i < NUM_PRODUCERS; i++) {
57 p_list[i].rank = i;
58 p_list[i].epfd = epfd;
59 ret = pthread_create(&p_list[i].thread_id, NULL, producer_routine, &p_list[i]);
60 if (ret != 0) handle_error(“pthread_create”);
61 }
62 // consumers
63 c_list = calloc(NUM_CONSUMERS, sizeof(struct thread_info));
64 if (!c_list) handle_error(“calloc”);
65 for (i = 0; i < NUM_CONSUMERS; i++) {
66 c_list[i].rank = i;
67 c_list[i].epfd = epfd;
68 ret = pthread_create(&c_list[i].thread_id, NULL, consumer_routine, &c_list[i]);
69 if (ret != 0) handle_error(“pthread_create”);
70 }
71 // join and exit
72 for (i = 0; i < NUM_PRODUCERS; i++) {
73 ret = pthread_join(p_list[i].thread_id, NULL);
74 if (ret != 0) handle_error(“pthread_join”);
75 }
76 for (i = 0; i < NUM_CONSUMERS; i++) {
77 ret = pthread_join(c_list[i].thread_id, NULL);
78 if (ret != 0) handle_error(“pthread_join”);
79 }
80 free(p_list);
81 free(c_list);
82 return EXIT_SUCCESS;}
執(zhí)行過程(2個生產(chǎn)者,4個消費者):
1[1532099804] Greetings from [producer-0]
2[1532099804] Greetings from [producer-1]
3[1532099804] Greetings from [consumer-0]
4[1532099804] Greetings from [consumer-1]
5[1532099804] Greetings from [consumer-2]
6[1532099804] Greetings from [consumer-3]
7[1532099805] [consumer-3] got event from fd-4
8[1532099805] [consumer-3] got event from fd-5
9[1532099806] [consumer-0] got event from fd-4
10[1532099806] [consumer-0] got event from fd-4
11[1532099807] [consumer-1] got event from fd-4
12[1532099807] [consumer-1] got event from fd-5
13[1532099808] [consumer-3] got event from fd-4
14[1532099808] [consumer-3] got event from fd-5
15^C
結(jié)果符合預(yù)期(附:源碼鏈接)
注意,推薦在eventfd在打開時設(shè)置NON_BLOCKING,并在注冊至epoll監(jiān)聽對象時設(shè)為EPOLLET(盡管一次8字節(jié)的read就可以讀完整個計數(shù)器到用戶空間),因為畢竟,只有采用了非阻塞IO和邊沿觸發(fā),epoll的并發(fā)能力才能完全發(fā)揮極致。
另外,本實例中的eventfd消費地非常高效,fd號幾乎不會超過5(前四個分別為stdin/stdout/stderr/eventpoll),但實際應(yīng)用中往往在close前會執(zhí)行一些事務(wù),隨著消費者線程的增加,eventfd打開的文件也會增加(這個數(shù)值得上限由系統(tǒng)的ulimit -n決定)。然而,eventfd打開、讀寫和關(guān)閉都效非常高,因為它本質(zhì)并不是文件,而是kernel在內(nèi)核空間(內(nèi)存中)維護(hù)的一個64位計數(shù)器而已。
timerfd對應(yīng)實現(xiàn)
main函數(shù)和consumer線程實現(xiàn)幾乎一致,而producer線程創(chuàng)建timerfd,并注冊到事件循環(huán)中。
timer的it_value設(shè)為1秒,即第一次觸發(fā)為1秒以后;it_interval設(shè)為3秒,即后續(xù)每3秒再次觸發(fā)一次。
注意,timerfd_settime函數(shù)的位置與之前eventfd的write的相同,二者達(dá)到了類似的設(shè)置事件的作用,只不過這次是定時器事件。
1static void *producer_routine(void *data) {
2 struct thread_info *p = (struct thread_info *)data;
3 struct epoll_event event;
4 int epfd = p->epfd;
5 int tfd = -1;
6 int ret = -1;
7 struct itimerspec its;
8 its.it_value.tv_sec = 1; // initial expiration
9 its.it_value.tv_nsec = 0;
10 its.it_interval.tv_sec = 3; // interval
11 its.it_interval.tv_nsec = 0;
12
13 log(“Greetings from [producer-%d]”, p->rank);
14 // create timerfd
15 tfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC|TFD_NONBLOCK);
16 if (tfd == -1) handle_error(“timerfd create: %s”, strerror(errno));
17 // register to poller
18 event.data.fd = tfd;
19 event.events = EPOLLIN | EPOLLET; // Edge-Triggered
20 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &event);
21 if (ret != 0) handle_error(“epoll_ctl”);
22 // register timer expired in future
23 ret = timerfd_settime(tfd, 0, &its, NULL);
24 if (ret != 0) handle_error(“timerfd settime”);
25 return (void *)0;}
執(zhí)行過程(2個生產(chǎn)者,4個消費者):
1[1532099143] Greetings from [producer-1]
2[1532099143] Greetings from [consumer-1]
3[1532099143] Greetings from [consumer-2]
4[1532099143] Greetings from [consumer-3]
5[1532099143] Greetings from [consumer-0]
6[1532099143] Greetings from [producer-0]
7[1532099144] [consumer-3] got event from fd-4
8[1532099144] [consumer-3] got event from fd-5
9[1532099147] [consumer-3] got event from fd-4
10[1532099147] [consumer-3] got event from fd-5
11[1532099150] [consumer-0] got event from fd-4
12[1532099150] [consumer-0] got event from fd-5
13[1532099153] [consumer-1] got event from fd-4
14[1532099153] [consumer-1] got event from fd-5
15^C
從上圖可以看出,運行時打開的fd-4和fd-5兩個文件描述符即是timerfd。
結(jié)果符合預(yù)期(附:源碼鏈接)
引用eventfs的Manual中NOTE段落的第一句話:
Applications can use an eventfd file descriptor instead of a pipe in all cases where a pipe is used simply to signal events.
在信號通知的場景下,相比pipe有非常大的資源和性能優(yōu)勢。其根本在于counter(計數(shù)器)和channel(數(shù)據(jù)信道)的區(qū)別。
第一,是打開文件數(shù)量的巨大差別。由于pipe是半雙工的傳統(tǒng)IPC方式,所以兩個線程通信需要兩個pipe文件,而用eventfd只要打開一個文件。眾所周知,文件描述符可是系統(tǒng)中非常寶貴的資源,linux的默認(rèn)值也只有1024而已。那開發(fā)者可能會說,1相比2也只節(jié)省了一半嘛。要知道pipe只能在兩個進(jìn)程/線程間使用,并且是面向連接(類似TCP socket)的,即需要之前準(zhǔn)備好兩個pipe;而eventfd是廣播式的通知,可以多對多的。如上面的NxM的生產(chǎn)者-消費者例子,如果需要完成全雙工的通信,需要NxMx2個的pipe,而且需要提前建立并保持打開,作為通知信號實在太奢侈了,但如果用eventfd,只需要在發(fā)通知的時候瞬時創(chuàng)建、觸發(fā)并關(guān)閉一個即可。
第二,是內(nèi)存使用的差別。eventfd是一個計數(shù)器,內(nèi)核維護(hù)幾乎成本忽略不計,大概是自旋鎖+喚醒隊列(后續(xù)詳細(xì)介紹),8個字節(jié)的傳輸成本也微乎其微。但pipe可就完全不是了,一來一回數(shù)據(jù)在用戶空間和內(nèi)核空間有多達(dá)4次的復(fù)制,而且更糟糕的是,內(nèi)核還要為每個pipe分配至少4K的虛擬內(nèi)存頁,哪怕傳輸?shù)臄?shù)據(jù)長度為0。
第三,對于timerfd,還有精準(zhǔn)度和實現(xiàn)復(fù)雜度的巨大差異。由內(nèi)核管理的timerfd底層是內(nèi)核中的hrtimer(高精度時鐘定時器),可以精確至納秒(1e-9秒)級,完全勝任實時任務(wù)。而用戶態(tài)要想實現(xiàn)一個傳統(tǒng)的定時器,通常是基于優(yōu)先隊列/二叉堆,不僅實現(xiàn)復(fù)雜維護(hù)成本高,而且運行時效率低,通常只能到達(dá)毫秒級。
所以,第一個最佳實踐法則:當(dāng)pipe只用來發(fā)送通知(傳輸控制信息而不是實際數(shù)據(jù)),放棄pipe,放心地用eventfd/timerfd,“in all cases”。
另外一個重要優(yōu)勢就是eventfd/timerfd被設(shè)計成與epoll完美結(jié)合,比如支持非阻塞的讀取等。事實上,二者就是為epoll而生的(但是pipe就不是,它在Unix的史前時代就有了,那時不僅沒有epoll連Linux都還沒誕生)。應(yīng)用程序可以在用epoll監(jiān)控其他文件描述符的狀態(tài)的同時,可以“順便“”一起監(jiān)控實現(xiàn)了eventfd的內(nèi)核通知機(jī)制,何樂而不為呢?
所以,第二個最佳實踐法則:eventfd配上epoll才更搭哦。
eventfd在內(nèi)核源碼中,作為syscall實現(xiàn)在內(nèi)核源碼的 fs/eventfd.c下。從Linux 2.6.22版本引入內(nèi)核,在2.6.27版本以后加入對flag的支持。以下分析參考Linux 2.6.27源碼。
內(nèi)核中的數(shù)據(jù)結(jié)構(gòu):eventfd_ctx
該結(jié)構(gòu)除了包括之前所介紹的一個64位的計數(shù)器,還包括了等待隊列頭節(jié)點(較新的kernel中還加上了一個kref)。
定義和初始化過程核心代碼如下,比較直接:內(nèi)核malloc,設(shè)置count值,創(chuàng)建eventfd的anon_inode。
1struct eventfd_ctx {
2 wait_queue_head_t wqh;
3 __u64 count;};
以下為創(chuàng)建eventfd的函數(shù)的片段,比較直接。
1SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags) {
2 // …
3 ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
4 if (!ctx)
5 return -ENOMEM;
6 init_waitqueue_head(&ctx->wqh);
7 ctx->count = count;
8 fd = anon_inode_getfd("[eventfd]", &eventfd_fops, ctx,
9 flags & (O_CLOEXEC | O_NONBLOCK));
10 // …}
稍提一下,等待隊列是內(nèi)核中的重要數(shù)據(jù)結(jié)構(gòu),在進(jìn)程調(diào)度、異步通知等多種場景都有很多的應(yīng)用。其節(jié)點結(jié)構(gòu)并不復(fù)雜,即自帶自旋鎖的雙向循環(huán)鏈表的節(jié)點,如下:
1struct __wait_queue_head {
2 spinlock_t lock;
3 struct list_head task_list;};typedef struct __wait_queue_head wait_queue_head_t;
等待隊列中存放的是task(內(nèi)存中對線程的抽象)的結(jié)構(gòu)。
操作等待隊列的函數(shù)主要是和調(diào)度相關(guān)的函數(shù),如:wake_up和schedule,它們位于sched.c中,前者即喚醒當(dāng)前等待隊列中的task,后者為當(dāng)前task主動讓出CPU時間給等待隊列中的其他task。這樣,便通過等待隊列實現(xiàn)了多個task在運行中(TASK_RUNNING)和IO等待(TASK_INTERRUPTABLE)中的狀態(tài)切換。
讓我們一起復(fù)習(xí)下,系統(tǒng)中進(jìn)程的狀態(tài)轉(zhuǎn)換:
TASK_RUNNING: 正在在CPU上運行,或者在執(zhí)行隊列(run queue)等待被調(diào)度執(zhí)行。
TASK_INTERRUPTIBLE: 睡眠中等待默寫事件出現(xiàn),task可以被信號打斷,一旦接收到信號或顯示調(diào)用了wake-up,轉(zhuǎn)為TASK_RUNNING狀態(tài)。常見于IO等待中。
清楚了task的兩種狀態(tài)以及run queue / wait queue原理,read函數(shù)就不難理解了。
以下是read函數(shù)的實現(xiàn):
1static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
2 loff_t *ppos){
3 struct eventfd_ctx *ctx = file->private_data;
4 ssize_t res;
5 __u64 ucnt;
6 DECLARE_WAITQUEUE(wait, current);
7
8 if (count < sizeof(ucnt))
9 return -EINVAL;
10 spin_lock_irq(&ctx->wqh.lock);
11 res = -EAGAIN;
12 ucnt = ctx->count;
13 if (ucnt > 0)
14 res = sizeof(ucnt);
15 else if (!(file->f_flags & O_NONBLOCK)) {
16 __add_wait_queue(&ctx->wqh, &wait);
17 for (res = 0;😉 {
18 set_current_state(TASK_INTERRUPTIBLE);
19 if (ctx->count > 0) {
20 ucnt = ctx->count;
21 res = sizeof(ucnt);
22 break;
23 }
24 if (signal_pending(current)) {
25 res = -ERESTARTSYS;
26 break;
27 }
28 spin_unlock_irq(&ctx->wqh.lock);
29 schedule();
30 spin_lock_irq(&ctx->wqh.lock);
31 }
32 __remove_wait_queue(&ctx->wqh, &wait);
33 __set_current_state(TASK_RUNNING);
34 }
35 if (res > 0) {
36 ctx->count = 0;
37 if (waitqueue_active(&ctx->wqh))
38 wake_up_locked(&ctx->wqh);
39 }
40 spin_unlock_irq(&ctx->wqh.lock);
41 if (res > 0 && put_user(ucnt, (__u64 __user *) buf))
42 return -EFAULT;
43
44 return res;}
read操作目的是要將count值返回用戶空間并清零。ctx中的count值是共享數(shù)據(jù),通過加irq自旋鎖實現(xiàn)對其的獨占安全訪問,spin_lock_irq函數(shù)可以禁止本地中斷和搶占,在SMP體系中也是安全的。從源碼可以看出,如果是對于(通常的epoll中的,也是上面實例中的)非阻塞讀,count大于0則直接返回并清零,count等于0則直接返回EAGAIN。
對于阻塞讀,如果count值為0則加入等待隊列并阻塞,直到值不為0時(被其他線程更新)返回。阻塞是如何實現(xiàn)的呢?是通過TASK_INTERRUPTABLE狀態(tài)下的循環(huán)加schedule。注意,schedule前釋放了自旋鎖,意味著允許其他線程更新值,只要值被更新大于0且又再次獲得cpu時間,那么就可以跳出循環(huán)繼續(xù)執(zhí)行而返回了。
考慮一個情景,兩個線程幾乎同時read請求,那么:兩個都會被加入到等待隊列中,當(dāng)?shù)谝粋€搶到自旋鎖,返回了大于1的res并重置了count為0,此時它會(在倒數(shù)第二個if那里) 第一時間喚醒等待隊列中的其他線程,此時第二個線程被調(diào)度到,于是開始了自己的循環(huán)等待。即實現(xiàn)了:事件只會通知到第一個接收到的線程。
那么問題來了:我們知道在其他線程write后,阻塞的read線程是馬上返回的。那么如何能在count置一旦不為0時,等待的調(diào)度的阻塞讀線程可以盡快地再次獲得cpu時間,從而繼續(xù)執(zhí)行呢?關(guān)鍵在于write函數(shù)也有當(dāng)確認(rèn)可以成功返回時,主動調(diào)用wakeup_locked的過程,這樣就能實現(xiàn)write后立即向等待隊列通知的效果了。
write操作與read操作過程非常相似,不在此展開。
關(guān)于poll操作的核心代碼如下:
1// …
2 spin_lock_irqsave(&ctx->wqh.lock, flags);
3 if (ctx->count > 0)
4 events |= POLLIN;
5 if (ctx->count == ULLONG_MAX)
6 events |= POLLERR;
7 if (ULLONG_MAX - 1 > ctx->count)
8 events |= POLLOUT;
9 spin_unlock_irqrestore(&ctx->wqh.lock, flags);
在count值大于0時,返回了設(shè)置POLLIN標(biāo)志的事件,使得用戶層的應(yīng)用可以通過epoll監(jiān)控 eventfd的可讀事件狀態(tài)。
本篇小結(jié)
通過對eventfd/timerfd的接口和實現(xiàn)的了解,可以看出其不僅功能實用,而且調(diào)用方式簡單。另外,其實現(xiàn)是非常精巧高效的,構(gòu)建于內(nèi)核眾多系統(tǒng)基礎(chǔ)核心功能之上,為用戶態(tài)的應(yīng)用封裝了十分高效簡單的事件通知機(jī)制。
參考資料
Linux 內(nèi)核源碼 https://elixir.bootlin.com/linux/latest/source/fs/eventfd.c
Linux Programmer’s Manual eventfd(2) - Linux manual page
總結(jié)
以上是生活随笔為你收集整理的让事件飞——Linux eventfd 原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 摄像模组中光学相关知识(四)
- 下一篇: linux 服务状态命令,Linux 查