【-----------------------------------原創文章 違者必究--------------------------------】node
在瞭解epoll以前,咱們先看下複用技術的概念和IO複用到底在說什麼?linux
複用技術 multiplexing 並非新技術而是一種設計思想,在通訊和硬件設計中存在頻分複用、時分複用、波分複用、碼分複用等,在平常生活中複用的場景也很是多,所以不要被專業術語所迷惑。git
從本質上來講,複用就是爲了解決有限資源和過多使用者的不平衡問題,從而實現最大的利用率,處理更多的問題。github
舉個例子:
不可釋放場景:ICU 病房的呼吸機做爲有限資源,病人一旦佔用且在未脫離危險以前是沒法放棄佔用的,所以不可能幾個狀況同樣的病人輪流使用。面試
可釋放場景:對於一些其餘資源好比醫護人員就能夠實現對多個病人的同時監護,理論上不存在一個病人佔用醫護人員資源不釋放的場景。編程
因此咱們能夠想一下,多個 IO 共用的資源(處理線程)是否具有可釋放性?api
I/O的含義:在計算機領域常說的IO包括磁盤 IO 和網絡 IO,咱們所說的IO複用主要是指網絡 IO ,在Linux中一切皆文件,所以網絡IO也常常用文件描述符 FD 來表示。數組
複用的含義:那麼這些文件描述符 FD 要複用什麼呢?在網絡場景中複用的就是任務處理線程,因此簡單理解就是多個IO共用1個處理線程。安全
IO複用的可行性:IO請求的基本操做包括read和write,因爲網絡交互的本質性,必然存在等待,換言之就是整個網絡鏈接中FD的讀寫是交替出現的,時而可讀可寫,時而空閒,因此IO複用是可用實現的。bash
綜上認爲:IO複用技術就是協調多個可釋放資源的FD交替共享任務處理線程完成通訊任務,實現多個fd對應1個任務處理線程的複用場景。
現實生活中IO複用就像一隻邊牧管理幾百只綿羊同樣:
畫外音:上面的一段話可能讀起來有些繞,樸素的說法就是讓任務處理線程以更小的資源消耗來協調更多的網絡請求鏈接,IO複用工具也是逐漸演進的,通過先後對比就能夠發現這個原則一直貫穿其中。
/* According to POSIX.1-2001 */ #include <sys/select.h> /* According to earlier standards */ #include <sys/time.h> #include <sys/types.h> #include <unistd.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);複製代碼
Macro: int FD_SETSIZE
The value of this macro is the maximum number of file descriptors that a fd_set object can hold information about. On systems with a fixed maximum number, FD_SETSIZE is at least that number. On some systems, including GNU, there is no absolute limit on the number of descriptors open, but this macro still has a constant value which controls the number of bits in an fd_set; if you get a file descriptor with a value as high as FD_SETSIZE, you cannot put that descriptor into an fd_set.
//用戶數據載體
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
//fd裝載入內核的載體
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
//三板斧api
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);複製代碼
epoll_create
該接口是在內核區建立一個epoll相關的一些列結構,而且將一個句柄fd返回給用戶態,後續的操做都是基於此fd的,參數size是告訴內核這個結構的元素的大小,相似於stl的vector動態數組,若是size不合適會涉及複製擴容,不過貌似4.1.2內核以後size已經沒有太大用途了;
epoll_ctl
該接口是將fd添加/刪除於epoll_create返回的epfd中,其中epoll_event是用戶態和內核態交互的結構,定義了用戶態關心的事件類型和觸發時數據的載體epoll_data;
epoll_wait
該接口是阻塞等待內核返回的可讀寫事件,epfd仍是epoll_create的返回值,events是個結構體數組指針存儲epoll_event,也就是將內核返回的待處理epoll_event結構都存儲下來,maxevents告訴內核本次返回的最大fd數量,這個和events指向的數組是相關的;
#define MAX_EVENTS 10 struct epoll_event ev, events[MAX_EVENTS]; int listen_sock, conn_sock, nfds, epollfd; /* Set up listening socket, 'listen_sock' (socket(), bind(), listen()) */ epollfd = epoll_create(10); if(epollfd == -1) { perror("epoll_create"); exit(EXIT_FAILURE); } ev.events = EPOLLIN; ev.data.fd = listen_sock; if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) { perror("epoll_ctl: listen_sock"); exit(EXIT_FAILURE); } for(;;) { nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); if (nfds == -1) { perror("epoll_pwait"); exit(EXIT_FAILURE); } for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listen_sock) { //主監聽socket有新鏈接 conn_sock = accept(listen_sock, (struct sockaddr *) &local, &addrlen); if (conn_sock == -1) { perror("accept"); exit(EXIT_FAILURE); } setnonblocking(conn_sock); ev.events = EPOLLIN | EPOLLET; ev.data.fd = conn_sock; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) { perror("epoll_ctl: conn_sock"); exit(EXIT_FAILURE); } } else { //已創建鏈接的可讀寫句柄 do_use_fd(events[n].data.fd); } } }複製代碼
特別注意: 在epoll_wait時須要區分是主監聽線程fd的新鏈接事件仍是已鏈接事件的讀寫請求,進而單獨處理。
紅黑樹節點定義:
#ifndef _LINUX_RBTREE_H #define _LINUX_RBTREE_H #include <linux/kernel.h> #include <linux/stddef.h> #include <linux/rcupdate.h> struct rb_node { unsigned long __rb_parent_color; struct rb_node *rb_right; struct rb_node *rb_left; } __attribute__((aligned(sizeof(long)))); /* The alignment might seem pointless, but allegedly CRIS needs it */ struct rb_root { struct rb_node *rb_node; }; 複製代碼
epitem定義:
struct epitem {
struct rb_node rbn;
struct list_head rdllink;
struct epitem *next;
struct epoll_filefd ffd;
int nwait;
struct list_head pwqlist;
struct eventpoll *ep;
struct list_head fllink;
struct epoll_event event;
}複製代碼
eventpoll定義:
struct eventpoll {
spin_lock_t lock;
struct mutex mtx;
wait_queue_head_t wq;
wait_queue_head_t poll_wait;
struct list_head rdllist; //就緒鏈表
struct rb_root rbr; //紅黑樹根節點
struct epitem *ovflist;
}複製代碼
建立並初始化一個strut epitem類型的對象,完成該對象和被監控事件以及epoll對象eventpoll的關聯;
將struct epitem類型的對象加入到epoll對象eventpoll的紅黑樹中管理起來;
將struct epitem類型的對象加入到被監控事件對應的目標文件的等待列表中,並註冊事件就緒時會調用的回調函數,在epoll中該回調函數就是ep_poll_callback();
ovflist主要是暫態處理,調用ep_poll_callback()回調函數的時候發現eventpoll的ovflist成員不等於EP_UNACTIVE_PTR,說明正在掃描rdllist鏈表,這時將就緒事件對應的epitem加入到ovflist鏈表暫存起來,等rdllist鏈表掃描完再將ovflist鏈表中的元素移動到rdllist鏈表;
一種普遍流傳的錯誤觀點:
epoll_wait返回時,對於就緒的事件,epoll使用的是共享內存的方式,即用戶態和內核態都指向了就緒鏈表,因此就避免了內存拷貝消耗
revents = ep_item_poll(epi, &pt);//獲取就緒事件 if (revents) { if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) { list_add(&epi->rdllink, head);//處理失敗則從新加入鏈表 ep_pm_stay_awake(epi); return eventcnt ? eventcnt : -EFAULT; } eventcnt++; uevent++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS;//EPOLLONESHOT標記的處理 else if (!(epi->event.events & EPOLLET)) { list_add_tail(&epi->rdllink, &ep->rdllist);//LT模式處理 ep_pm_stay_awake(epi); } }複製代碼
使用Linux epoll模型的LT水平觸發模式,當socket可寫時,會不停的觸發socket可寫的事件,如何處理?
你在廣場喂鴿子,你只投餵了一份食物,卻引來一羣鴿子爭搶,最終仍是隻有一隻鴿子搶到了食物,對於其餘鴿子來講是徒勞的。