關鍵詞:fasync_helper、kill_async、sigsuspend、sigaction、fcntl、F_SETOWN_EX、F_SETSIG、select()、poll()、poll_wait()等。html
《Linux/UNIX系統編程手冊》第63章主要介紹了select()/poll()、信號驅動IO、epoll三方面,以及他們之間異同、優劣點。node
這裏準備結合項目中遇到的問題,分兩個方向進行概括總結。一是一個IO模型從測試程序、API、內核實現進行縱向分析;二是橫向不一樣IO模型的優缺點對比。linux
IO多路複用容許進程同時檢查多個文件描述符以找出它們中的任何一個是否可執行IO操做。系統調用select()和poll()用來執行IO多路複用。shell
信號驅動IO是指當有輸入或者數據能夠寫到指定的文件描述符上時,內核向請求數據的進程發送一個信號。進程能夠處理其餘的任務,當IO操做可執行時經過接收信號來得到通知。當同時檢查大量的文件描述符時,信號驅動IO相比select()和poll()有顯著的性能提高。編程
epoll API是Linux專有的特性,首次出如今Linux 2.6中。同IO多路複用API同樣,epoll API容許進程同時檢查多個文件描述符,看其中任意一個是否能執行IO操做。通訊號驅動IO同樣,當同時檢查大量文件描述符時,epoll能提供更好的性能。數組
在實際應用中使用那種技術,下面是一些要點:緩存
討論多種IO機制以前,首先區分兩種文件描述符準備就緒的通知模式。數據結構
水平觸發通知:若是文件描述符上能夠非阻塞地執行IO系統調用,此時認爲它已經就緒。多線程
邊緣觸發通知:若是文件描述符自上次狀態檢查以來有了新的IO活動,此時須要觸發通知。app
IO模式 | 水平觸發 | 邊緣觸發 |
select()/poll() | √ | |
信號驅動IO | √ | |
epoll | √ | √ |
當採用水平觸發通知時,能夠在任意時刻檢查文件描述符的就緒狀態。表示當文件描述符處於就緒態時,就能夠對其執行一些IO操做;而後重複檢查文件描述符。看看是否仍然處於就緒態,此時能夠執行更多的IO。因爲水平觸發模式容許咱們在任意時刻重複檢查IO狀態,沒有必要每次當文件描述符就緒後須要儘量多地址性IO。
當採用邊緣觸發時,只有當IO事件發生時纔會收到通知,所以:
信號驅動IO中,當文件描述符上可執行IO操做時,進程請求內核爲本身發送一個信號。
進程能夠執行其餘任何任務直到IO就緒位置,此時內核會發送信號給進程。
信號驅動IO的使用遵循必定的步驟:
1.爲內核發送通知信號安裝一個信號處理例程;一般是SIGIO,但在多線程環境下能夠指定自定義的實時信號。
2.設定文件描述符屬主,也就是當文件描述符上可執行IO操做時會接收通知信號的進程或進程組。經過fcntl()命令F_SETOWN或者F_SETOWN_EX來制定。
3.經過設定O_NONBLOCK標誌是能非阻塞IO。
4.經過O_ASYNC標誌是能信號驅動IO。
5.而後進程能夠處理其餘任務,當有信號到達時對應的信號處理函數就會被調用
6.一般狀況下,若是須要等待IO操做,能夠經過sigsuspend()進行等待,此時進程會放棄
首先構造內核驅動和用戶空間測試程序,而後針對測試程序進行詳細的分析。
建立signal_kernel.c做爲模組插入內核,signal_user.c做爲用戶空間測試程序。
建立/dev/signal0設備,經過wirte可讓內核發送信號到用戶空間;wirte以後當即進入sigsuspend()等待。
#include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/uaccess.h> static struct class *signal_class; static int signal_major; #define SIGNAL_NAME "signal" struct timer_list signal_timer; static struct fasync_struct *signal_async; int signal_count = 0; static unsigned char signal_text[30]; static int signal_release(struct inode *inode, struct file *file) { return 0; } static void dummy_timer(unsigned long data) { //printk("Send SIGIO signal.\n"); kill_fasync (&signal_async, SIGIO, POLL_IN);----------------------------------------發送異步信號給fa綁定的進程。 signal_count++; } static int signal_open(struct inode *inode, struct file *file) { return 0; } static ssize_t signal_read(struct file * file, char __user * buf, size_t count, loff_t *ppos) { int size; size = sprintf(signal_text, "%d", signal_count); copy_to_user(buf, signal_text, size); return size; } static ssize_t signal_write(struct file * file, const char __user * buf, size_t count, loff_t *ppos) { char str[30]; copy_from_user(str, buf,count); printk("Receive %s from user.\n",str); dummy_timer(0); return 0; } static int signal_fasync (int fd, struct file *filp, int on) { return fasync_helper(fd, filp, on, &signal_async);----------------------------------建立fasync_struct並插入到當前fasync鏈表中。 } static const struct file_operations signal_fops = { .owner = THIS_MODULE, .open = signal_open, .release = signal_release, .read = signal_read, .write = signal_write, .fasync = signal_fasync,----------------------------------------------------------當用戶空間設置FASYNC的時候調用fasync函數。 }; static int __init signal_test_init(void) { struct device *signal_device; signal_major = register_chrdev(0, SIGNAL_NAME, &signal_fops); if (signal_major < 0) { pr_err("register_chrdev failed\n"); goto err; } signal_class = class_create(THIS_MODULE, SIGNAL_NAME); if (IS_ERR(signal_class)) { pr_err("device class file already in use\n"); goto err_class; } signal_device = device_create(signal_class, NULL, MKDEV(signal_major, 0), NULL, "%s%d", SIGNAL_NAME, 0); if (IS_ERR(signal_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(signal_class); err_class: unregister_chrdev(signal_major, SIGNAL_NAME); err: return 0; } static void __exit signal_test_exit(void) { del_timer(&signal_timer); device_destroy(signal_class, MKDEV(signal_major, 0)); class_destroy(signal_class); unregister_chrdev(signal_major, SIGNAL_NAME); } module_init(signal_test_init); module_exit(signal_test_exit); MODULE_LICENSE("GPL");
對應的Makefile以下:
CONFIG_MODULE_SIG=n EXTRA_CFLAGS += -D_GNU_SOURCE---------------------------------------------針對F_SETOWN_EX等擴展命令須要定義此宏。 obj-m := signal_kernel.o KERN_DIR := /lib/modules/$(shell uname -r)/build PWD := $(shell pwd) all: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules gcc $(EXTRA_CFLAGS) signal_user.c -o signal_user -pthread clean: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean rm signal_user modules_install: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install
用戶空間測試程序signal_user.c以下。
測試程序建立一個sigio線程專門處理信號,重點是進行信號處理前進行一系列設置;而後調用sigsuspend()進入睡眠,等待內核信號喚醒。
在實際應用中,有多是中斷調用kill_fasync()發送信號。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <signal.h> #include <sys/types.h> #include <unistd.h> #include <fcntl.h> #include <time.h> #include <pthread.h> #include <sys/prctl.h> #include <sys/syscall.h> #include <memory.h> int fd; char *filename = "/dev/signal0"; char signal_response[] = "OK"; unsigned int handle_count = 0; #define LOOP_COUNT 10 #define SIGTEST (SIGRTMIN+1) void sig_handler(int sig, siginfo_t *si, void *uc) { unsigned char signal_count[30]; memset(signal_count, 30, 0); read(fd, &signal_count, 0); handle_count ++; printf(">>>>>PID %ld Receive sig=%d count=%d.\n", syscall(SYS_gettid), sig, handle_count); //Trigger a SIGTEST signal. if(handle_count < LOOP_COUNT) write(fd, signal_response, sizeof(signal_response));------------------重複觸發內核發送SIGTEST信號。 //if(atoi(signal_count) != handle_count) //printf("%s: receive_count=%d, handle_count=%d\n", __func__, atoi(signal_count), handle_count); } void *pthread_func(void *arg) { int Oflags; struct timespec time1, time2; struct sigaction sa; sigset_t set, oldset; unsigned long int duration; struct f_owner_ex owner_ex; //Set thread name. prctl(PR_SET_NAME,"sigio"); printf("main=%d, sigio=%ld.\n", getpid(), syscall(SYS_gettid)); //Set SIGTEST actiong. memset(&sa, 0, sizeof(sa)); sa.sa_sigaction = sig_handler; sa.sa_flags |= SA_RESTART | SA_SIGINFO;------------------------------------這裏必須使用sa_sigaction和SA_SIGINFO。 sigaction(SIGTEST, &sa, NULL);---------------------------------------------設置SIGTEST信號處理函數。 //Set proc mask. sigemptyset(&set); sigprocmask(SIG_SETMASK, &set, NULL); sigaddset(&set, SIGTEST); fd = open(filename, O_RDWR); if (fd < 0) { printf("Can't open %s!\n", filename); } owner_ex.pid = syscall(SYS_gettid); owner_ex.type = F_OWNER_TID; fcntl(fd, F_SETOWN_EX, &owner_ex);------------------------------------------將fd文件句柄和當前線程綁定;若是設置F_SETOWN則是和線程組綁定,這二者區別後續重點介紹。 Oflags = fcntl(fd, F_GETFL); fcntl(fd, F_SETFL, Oflags | FASYNC);----------------------------------------對應內核驅動file_operations的fasync成員,給當前fd建立一個fasync_struct。 fcntl(fd, F_SETSIG, SIGTEST);-----------------------------------------------設置SIGTEST替代SIGIO做爲信號發送。 clock_gettime(CLOCK_REALTIME, &time1); write(fd, signal_response, sizeof(signal_response));------------------------觸發內核發送SIGTEST信號。 //pthread_sigmask(SIG_BLOCK, &set, &oldset); sigprocmask(SIG_BLOCK, &set, &oldset); while(handle_count < LOOP_COUNT) { //Will suspend here, and later code will be executed after SIGTEST handler. sigsuspend(&oldset);----------------------------------------------------在此處睡眠,等待信號來喚醒。 } //pthread_sigmask(SIG_UNBLOCK, &set, NULL); sigprocmask(SIG_UNBLOCK, &set, NULL); clock_gettime(CLOCK_REALTIME, &time2); duration =(time2.tv_sec-time1.tv_sec)*1000000000 + (time2.tv_nsec-time1.tv_nsec); printf("End time %ld.%ld\n", duration/1000000000, duration%1000000000); close(fd); pthread_exit(0); } void main(int argc, char **argv) { pthread_t tidp; sigset_t set; sigemptyset(&set); sigaddset(&set, SIGTEST); sigprocmask(SIG_BLOCK, &set, NULL); if(pthread_create(&tidp, NULL, pthread_func, NULL) == -1) { printf("Create pthread error.\n"); return; } if(pthread_join(tidp, NULL)) { printf("Join pthread error.\n"); return; } printf("Main exit.\n"); return; }
將編譯的module signal_kernel.ko插入內核,而後用戶空間執行sudo ./signal_user結果以下。
main=10485, sigio=10486. >>>>>PID 10486 Receive sig=35 count=1. >>>>>PID 10486 Receive sig=35 count=2. >>>>>PID 10486 Receive sig=35 count=3. >>>>>PID 10486 Receive sig=35 count=4. >>>>>PID 10486 Receive sig=35 count=5. >>>>>PID 10486 Receive sig=35 count=6. >>>>>PID 10486 Receive sig=35 count=7. >>>>>PID 10486 Receive sig=35 count=8. >>>>>PID 10486 Receive sig=35 count=9. >>>>>PID 10486 Receive sig=35 count=10. End time 0.103711 Main exit.
首先整個流程在用戶空間配置;而後由內核發起,用戶空間處理。
除了signal(),sigaction()是另外一種設置信號處置的選擇。sigaction()更加複雜,但也根據靈活性。
#include <signal.h> int sigaction(int sig, const struct sigaction *act, struct sigaction *oldact);
sig參數標識想要獲取或改變的信號編號,可使處SIGKILL和SIGSTOP以外任何信號。
act指向描述信號新處置數據結構,oldact用於返回以前信號處置相關信息。二者均可以設置爲NULL。
struct sigaction { void (*sa_handler)(int); /* Address of handler */ sigset_t sa_mask; /* Signals blocked during handler invocation */ int sa_flags; /* Flags controlling handler invocation */ void (*sa_restorer)(void); /* Not for application use */ };
sa_handler指向sig對應的處理函數。
sa_mask定義一組新號,不容許它們中斷此處理程序的執行。調用信號處理程序以前,將該組信號中其餘尚未處於進程掩碼之列的信號添加到進程掩碼中;在信號處理返回以後再將,以前添加的掩碼移除。這就保證,在處理程序執行期間,sa_mask全部信號不會中斷此處理程序。
引起處理程序自身的信號將自動添加到進程掩碼中,這意味着信號處理程序不容許嵌套。當正在執行處理程序時,若是同一信號第二次抵達,將不會遞歸中斷本身。
因爲標準信號(sig < SIGRTMIN)信號處理程序不會被隊列化,因此標準信號處理程序執行期間第二次達到的同一信號將會被忽略。
可是事實信號不會存在這種現象。
struct sighand_struct { atomic_t count; struct k_sigaction action[_NSIG];------------------------------------------------一共_NSIG,即64個信號1-31是標準信號,32-64是實時信號。 spinlock_t siglock; wait_queue_head_t signalfd_wqh; }; int do_sigaction(int sig, struct k_sigaction *act, struct k_sigaction *oact) { struct task_struct *p = current, *t; struct k_sigaction *k; sigset_t mask; if (!valid_signal(sig) || sig < 1 || (act && sig_kernel_only(sig))) return -EINVAL; k = &p->sighand->action[sig-1];------------------------------------------------------將k指向當前進程sig對應的k_sigaction結構。 spin_lock_irq(&p->sighand->siglock); if (oact) *oact = *k; sigaction_compat_abi(act, oact); if (act) { sigdelsetmask(&act->sa.sa_mask, sigmask(SIGKILL) | sigmask(SIGSTOP)); *k = *act;-----------------------------------------------------------------------將act和k關聯。 ... } spin_unlock_irq(&p->sighand->siglock); return 0; }
F_SETOWN是標準fcntl;F_SETOWN_EX是Linux擴展命令,若是要使用須要加上_D_GNU_SOURCE。
這兩個命令都經過fcntl()系統調用,設置文件控制操做。
struct fown_struct是內核中關聯文件句柄和其擁有者信息的結構體:
struct fown_struct { rwlock_t lock; /* protects pid, uid, euid fields */ struct pid *pid; /* pid or -pgrp where SIGIO should be sent */----------------------擁有此文件SIGIO信號的進程。 enum pid_type pid_type; /* Kind of process group SIGIO should be sent to */---------進程類型。 kuid_t uid, euid; /* uid/euid of process setting the owner */ int signum; /* posix.1b rt signal to be delivered on IO */----------------------kill_fasync()發送的信號。 };
下面就來分析這兩命令的區別,以及他們是如何影響信號處理行爲的。
enum pid_type { PIDTYPE_PID, PIDTYPE_PGID, PIDTYPE_SID, PIDTYPE_MAX, /* only valid to __task_pid_nr_ns() */ __PIDTYPE_TGID }; static long do_fcntl(int fd, unsigned int cmd, unsigned long arg, struct file *filp) { long err = -EINVAL; switch (cmd) { ... case F_GETFL:------------------------------------------------獲取、設置文件句柄的f_flags。 err = filp->f_flags; break; case F_SETFL: err = setfl(fd, filp, arg); break; ...
case F_GETOWN:-----------------------------------------------獲取、設置文件句柄的擁有者進程pid。 err = f_getown(filp); force_successful_syscall_return(); break; case F_SETOWN: f_setown(filp, arg, 1); err = 0; break; case F_GETOWN_EX:--------------------------------------------進階獲取、設置文件句柄擁有者進程pid。 err = f_getown_ex(filp, arg); break; case F_SETOWN_EX: err = f_setown_ex(filp, arg); break; ... case F_GETSIG:-----------------------------------------------設置文件句柄和其擁有者之間異步通訊的信號。 err = filp->f_owner.signum; break; case F_SETSIG: if (!valid_signal(arg)) { break; } err = 0; filp->f_owner.signum = arg; break; ... } return err; } pid_t f_getown(struct file *filp) { pid_t pid; read_lock(&filp->f_owner.lock); pid = pid_vnr(filp->f_owner.pid); if (filp->f_owner.pid_type == PIDTYPE_PGID)-----------------若是是進程組返回其負值。 pid = -pid; read_unlock(&filp->f_owner.lock); return pid; } void f_setown(struct file *filp, unsigned long arg, int force) { enum pid_type type; struct pid *pid; int who = arg; type = PIDTYPE_PID; if (who < 0) { type = PIDTYPE_PGID;-----------------------------------默認類型是進程,若是who爲負,則是進程組。 who = -who; } rcu_read_lock(); pid = find_vpid(who); __f_setown(filp, pid, type, force); rcu_read_unlock(); } static int f_getown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; int ret = 0; read_lock(&filp->f_owner.lock); owner.pid = pid_vnr(filp->f_owner.pid); switch (filp->f_owner.pid_type) {-------------------------相對於F_SETOWN,多了pid_type類型設置。其中PIDTYPE_MAX表示該文件句柄被一個線程所擁有。而不是進程或者進程組。這點對於信號究竟發給誰,有着很是重要的做用。 case PIDTYPE_MAX: owner.type = F_OWNER_TID; break; case PIDTYPE_PID: owner.type = F_OWNER_PID; break; case PIDTYPE_PGID: owner.type = F_OWNER_PGRP; break; default: WARN_ON(1); ret = -EINVAL; break; } read_unlock(&filp->f_owner.lock); if (!ret) { ret = copy_to_user(owner_p, &owner, sizeof(owner)); if (ret) ret = -EFAULT; } return ret; } static int f_setown_ex(struct file *filp, unsigned long arg) { struct f_owner_ex __user *owner_p = (void __user *)arg; struct f_owner_ex owner; struct pid *pid; int type; int ret; ret = copy_from_user(&owner, owner_p, sizeof(owner)); if (ret) return -EFAULT; switch (owner.type) { case F_OWNER_TID: type = PIDTYPE_MAX; break; case F_OWNER_PID: type = PIDTYPE_PID; break; case F_OWNER_PGRP: type = PIDTYPE_PGID; break; default: return -EINVAL; } rcu_read_lock(); pid = find_vpid(owner.pid); if (owner.pid && !pid) ret = -ESRCH; else __f_setown(filp, pid, type, 1); rcu_read_unlock(); return ret; } static void f_modown(struct file *filp, struct pid *pid, enum pid_type type, int force) { write_lock_irq(&filp->f_owner.lock); if (force || !filp->f_owner.pid) { put_pid(filp->f_owner.pid); filp->f_owner.pid = get_pid(pid); filp->f_owner.pid_type = type; if (pid) { const struct cred *cred = current_cred(); filp->f_owner.uid = cred->uid; filp->f_owner.euid = cred->euid; } } write_unlock_irq(&filp->f_owner.lock); } void __f_setown(struct file *filp, struct pid *pid, enum pid_type type, int force)----------------------------------------------------------------不管是F_SETOWN仍是F_SETOWN_EX二者的force都是1。 { security_file_set_fowner(filp); f_modown(filp, pid, type, force); }
因此F_SETOWN和F_SETOWN_EX主要區別就在於,F_SETOWN_EX能夠設置進程的類型。這對於後續信號的發送,有重要做用。
F_SETOWN_EX更加細緻,能夠指定只發送給某個線程;而F_SETOWN優先發送給線程。若是線程被阻塞,則選擇同一進程中的其餘線程接收。
相關問題調試詳見:《sigsuspend()阻塞:異步信號SIGIO爲何會被截胡?》。
若是要使用實時信號替代SIGIO做爲kill_fasync()做爲信號發送,能夠設置F_SETSIG。
參照do_fcntl(),也即經過設置fown_struct的signum值。
若是須要在信號處理函數中獲取更多信息,還須要在sa.sa_flags增長SA_SIGINFO標誌。
struct sigaction { union { void (*sa_handler)(int); void (*sa_sigaction)(int, siginfo_t *, void *); } __sigaction_handler; sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); };
在定義sa_handler是就須要定義以下類型函數:
void handler(int sig, siginfo_t *siginfo, void *ucontext);
其中第二個參數siginfo包含的字段標識出了在哪一個文件描述上發生了事件。
使用F_SETSIG有兩個優勢:
1.指定不一樣信號做爲信號驅動IO通知信號,解決了在同一進程範圍內,多線程使用信號驅動IO衝突問題。由於默認都是發送SIGIO,而信號處理是進程範圍的。
2.默認的SIGIO是非排隊信號,若是有對個IO時間發送了信號,而SIGIO被阻塞了,除了第一個通知外,其餘後續的通知都會丟失。使用實時信號就不會存在這個問題,信號的處理會被排隊;除非信號隊列溢出。
此表示是經過fcntl進行設置的,因此看一些setfl()。
static int setfl(int fd, struct file * filp, unsigned long arg) { struct inode * inode = file_inode(filp); int error = 0; ... /* * ->fasync() is responsible for setting the FASYNC bit. */ if (((arg ^ filp->f_flags) & FASYNC) && filp->f_op->fasync) {----------------------能夠看出主要當前文件具備FASYNC標誌位,而且f_op->fasync()定義了就會執行相關函數。 error = filp->f_op->fasync(fd, filp, (arg & FASYNC) != 0); if (error < 0) goto out; if (error > 0) error = 0; } spin_lock(&filp->f_lock); filp->f_flags = (arg & SETFL_MASK) | (filp->f_flags & ~SETFL_MASK); spin_unlock(&filp->f_lock); out: return error; }
咱們看到對應的驅動中調用fasync_helper()進行fasync_struct建立,並插入到fasync列表中。
int fasync_helper(int fd, struct file * filp, int on, struct fasync_struct **fapp) { if (!on) return fasync_remove_entry(filp, fapp); return fasync_add_entry(fd, filp, fapp); } static int fasync_add_entry(int fd, struct file *filp, struct fasync_struct **fapp) { struct fasync_struct *new; new = fasync_alloc(); if (!new) return -ENOMEM; if (fasync_insert_entry(fd, filp, fapp, new)) { fasync_free(new); return 0; } return 1; }
當設備IO就需後,經過kill_fasync()發送信號。
void kill_fasync(struct fasync_struct **fp, int sig, int band) { if (*fp) { rcu_read_lock(); kill_fasync_rcu(rcu_dereference(*fp), sig, band); rcu_read_unlock(); } } static void kill_fasync_rcu(struct fasync_struct *fa, int sig, int band) { while (fa) {-------------------------------------------------------------------遍歷當前文件句柄的全部fasync列表,並進行信號發送操做。 struct fown_struct *fown; unsigned long flags; if (fa->magic != FASYNC_MAGIC) { printk(KERN_ERR "kill_fasync: bad magic number in " "fasync_struct!\n"); return; } spin_lock_irqsave(&fa->fa_lock, flags); if (fa->fa_file) { fown = &fa->fa_file->f_owner; if (!(sig == SIGURG && fown->signum == 0))-----------------------------只有在signum爲0,並sig等於SIGURQ放棄發送信號,由於SIGURQ有本身的特殊處理。 send_sigio(fown, fa->fa_fd, band); } spin_unlock_irqrestore(&fa->fa_lock, flags); fa = rcu_dereference(fa->fa_next); } } void send_sigio(struct fown_struct *fown, int fd, int band) { struct task_struct *p; enum pid_type type; struct pid *pid; int group = 1; read_lock(&fown->lock); type = fown->pid_type; if (type == PIDTYPE_MAX) {----------------------------------------------------這裏體現了F_SETOWN_EX因爲F_SETOWN的地方,若是設置爲PIDTYPE_MAX,那麼group則爲0,只發送給線程,不會選擇其餘線程做爲替代。 group = 0; type = PIDTYPE_PID; } pid = fown->pid; if (!pid) goto out_unlock_fown; read_lock(&tasklist_lock); do_each_pid_task(pid, type, p) { send_sigio_to_task(p, fown, fd, band, group); } while_each_pid_task(pid, type, p); read_unlock(&tasklist_lock); out_unlock_fown: read_unlock(&fown->lock); } static void send_sigio_to_task(struct task_struct *p, struct fown_struct *fown, int fd, int reason, int group) { /* * F_SETSIG can change ->signum lockless in parallel, make * sure we read it once and use the same value throughout. */ int signum = ACCESS_ONCE(fown->signum);--------------------------------------------------------在使用F_SETSIG設置信號後最好很差改變,不然可能形成發送和信號處理函數不匹配。 if (!sigio_perm(p, fown, signum)) return; switch (signum) { siginfo_t si; default:-----------------------------------------------------------------------------------其餘狀況發送一個RT信號,提供更加豐富的返回信息。 si.si_signo = signum; si.si_errno = 0; si.si_code = reason; BUG_ON((reason & __SI_MASK) != __SI_POLL); if (reason - POLL_IN >= NSIGPOLL) si.si_band = ~0L; else si.si_band = band_table[reason - POLL_IN]; si.si_fd = fd; if (!do_send_sig_info(signum, &si, p, group))------------------------------------------使用signum代提SIGIO做爲信號發送。 break; case 0:------------------------------------------------------------------------------------在沒有經過F_SETSIG設置signum的狀況下,默認發送SIGIO信號。 do_send_sig_info(SIGIO, SEND_SIG_PRIV, p, group); } } int do_send_sig_info(int sig, struct siginfo *info, struct task_struct *p, bool group) { unsigned long flags; int ret = -ESRCH; if (lock_task_sighand(p, &flags)) { ret = send_signal(sig, info, p, group); unlock_task_sighand(p, &flags); } return ret; } static inline struct sighand_struct *lock_task_sighand(struct task_struct *tsk, unsigned long *flags) { struct sighand_struct *ret; ret = __lock_task_sighand(tsk, flags); (void)__cond_lock(&tsk->sighand->siglock, ret); return ret; } static inline void unlock_task_sighand(struct task_struct *tsk, unsigned long *flags) { spin_unlock_irqrestore(&tsk->sighand->siglock, *flags); }
實時信號相對於標準信號有以下優點:
標準信號和實時信號都經過seng_signal()發送。下面看一下二者是如何被區別對待的。
static int send_signal(int sig, struct siginfo *info, struct task_struct *t, int group) { int from_ancestor_ns = 0; #ifdef CONFIG_PID_NS from_ancestor_ns = si_fromuser(info) && !task_pid_nr_ns(current, task_active_pid_ns(t)); #endif return __send_signal(sig, info, t, group, from_ancestor_ns); } static int __send_signal(int sig, struct siginfo *info, struct task_struct *t, int group, int from_ancestor_ns) { struct sigpending *pending; struct sigqueue *q; int override_rlimit; int ret = 0, result; assert_spin_locked(&t->sighand->siglock); result = TRACE_SIGNAL_IGNORED; if (!prepare_signal(sig, t, from_ancestor_ns || (info == SEND_SIG_FORCED))) goto ret; pending = group ? &t->signal->shared_pending : &t->pending;----------------------------group起做用的地方,當group爲0,則將信號放在當前線程私有的pending列表上;若是group爲1,則將信號放在線程組共享的shared_pending列表上。 result = TRACE_SIGNAL_ALREADY_PENDING; if (legacy_queue(pending, sig))--------------------------------------------------------若是sig<SIGRTMIN,而且sig已經被處於pending中。也即標準信號sig已經在pending隊列中,則返回表示當前sig信號已經放入到隊列中。這也說明標準信號沒有被隊列化。 goto ret; result = TRACE_SIGNAL_DELIVERED; if (info == SEND_SIG_FORCED) goto out_set; if (sig < SIGRTMIN) override_rlimit = (is_si_special(info) || info->si_code >= 0);--------------------標準信號存在override_rlimit爲1狀況,那麼即便當前信號隊列達到了RLIMIT_SIGPENDING。仍然能夠建立信號隊列告訴緩存。 else override_rlimit = 0;--------------------------------------------------------------實時信號是不容許超過RLIMIT_SIGPENDING限制的。 q = __sigqueue_alloc(sig, t, GFP_ATOMIC | __GFP_NOTRACK_FALSE_POSITIVE, override_rlimit);-----------------------------------------------------------------建立一個高速緩存sigqueue_cachep。 if (q) { list_add_tail(&q->list, &pending->list);------------------------------------------將新建立的信號加入到隊列中。 switch ((unsigned long) info) { case (unsigned long) SEND_SIG_NOINFO: q->info.si_signo = sig; q->info.si_errno = 0; q->info.si_code = SI_USER; q->info.si_pid = task_tgid_nr_ns(current, task_active_pid_ns(t)); q->info.si_uid = from_kuid_munged(current_user_ns(), current_uid()); break; case (unsigned long) SEND_SIG_PRIV: q->info.si_signo = sig; q->info.si_errno = 0; q->info.si_code = SI_KERNEL; q->info.si_pid = 0; q->info.si_uid = 0; break; default: copy_siginfo(&q->info, info); if (from_ancestor_ns) q->info.si_pid = 0; break; } userns_fixup_signal_uid(&q->info, t); } else if (!is_si_special(info)) { if (sig >= SIGRTMIN && info->si_code != SI_USER) { result = TRACE_SIGNAL_OVERFLOW_FAIL; ret = -EAGAIN; goto ret; } else { result = TRACE_SIGNAL_LOSE_INFO; } } out_set: signalfd_notify(t, sig); sigaddset(&pending->signal, sig); complete_signal(sig, t, group); ret: trace_signal_generate(sig, info, t, group, result); return ret; }
static struct sigqueue *
__sigqueue_alloc(int sig, struct task_struct *t, gfp_t flags, int override_rlimit)
{
struct sigqueue *q = NULL;
struct user_struct *user;
rcu_read_lock();
user = get_uid(__task_cred(t)->user);
atomic_inc(&user->sigpending);
rcu_read_unlock();
if (override_rlimit ||
atomic_read(&user->sigpending) <=
task_rlimit(t, RLIMIT_SIGPENDING)) {----------------------------------這裏面說明了override_rlimit爲1狀況能夠超出RLIMIT_SIGPENDING限制。實時信號在隊列超過RLIMIT_SIGPENDING後則不容許建立信號隊列。
q = kmem_cache_alloc(sigqueue_cachep, flags);
} else {
print_dropped_signal(sig);---------------------------------------------------當出現丟棄信號的狀況,打印信息。
}
if (unlikely(q == NULL)) {
atomic_dec(&user->sigpending);
free_uid(user);
} else {
INIT_LIST_HEAD(&q->list);
q->flags = 0;
q->user = user;
}
return q;
}
這裏詳細解釋了信號驅動IO的流程;但信號產生放入pending隊列以後,什麼時候被處理呢?能夠參考《信號什麼時候被處理》。
IO多路複用容許咱們同時檢查多個文件描述符,看其中任意一個是否可執行IO操做。能夠在普通文件、終端、管道等字符型設備上使用select()/poll()來檢查文件描述符。
下面分別對select()/poll()的API、內核流程、測試程序進行分析,而後對二者進行對比。
select()會一直阻塞,直到一個或多個文件描述符集合成爲就緒態。
#include <sys/time.h> /* For portability */ #include <sys/select.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
參數nfds、readfds、writefds、exceptfds指定了select()要檢查的文件描述符集合。
參數timeout用來設定select()阻塞的時間上限。
#include <sys/select.h> void FD_ZERO(fd_set *fdset);--------------------將fdset指向的集合初始化爲空。 void FD_SET(int fd, fd_set *fdset);-------------將描述符fd添加到fdset所指向的集合中。 void FD_CLR(int fd, fd_set *fdset);-------------將描述符fd從fdset指向的集合中移除。 int FD_ISSET(int fd, fd_set *fdset);------------判斷描述符fd是否在fdset結合中設置。
文件描述符集合有一個最大容量限制,由常量FD_SETSIZE來決定。Linux一般爲1024。
參數timeout控制着select()阻塞行爲,NULL時select()會一直阻塞。或者指向一個timeval結構體。
struct timeval { time_t tv_sec; /* Seconds */ suseconds_t tv_usec; /* Microseconds (long int) */ };
當timeout爲NULL或者指向結構體字段非0時,select()阻塞直到下列事件發生:
若是timeout非空,且一個或多個文件描述符就緒返回時,timeout所指向的結構體表示剩餘超時時間。
-1表示有錯誤發生。EBADF表示有一個文件描述符是非法的;EINTR表示該調用被信號處理例程中斷了。
0表示在任何文件描述符成爲就緒態前select()已經超時。
正整數表示一個或多個文件描述符已達到就緒態。返回值表示就緒態的文件描述符個數。
select系統調用路徑是select()->core_sys_select()->do_select(),最終是遍歷每一個文件句柄的poll成員,根據poll()返回的參數判斷當前文件狀態。
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp) { struct timespec64 end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))------------------------------------從內核拷貝信息並轉換成內核數據結構struct timespec64。 return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to);------------------------------------------------和系統調用相似,只是to已經轉變成內核時間。 ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);------------------------------------計算end_time和當前時間的差值,並轉化成tvp返回給用戶空間。 return ret; } int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec64 *end_time) { fd_set_bits fds; void *bits; int ret, max_fds; size_t size, alloc_size; struct fdtable *fdt; /* Allocate small arguments on the stack to save memory and be faster */ long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];----------------------------------------------SELECT_STACK_ALLOC默認是256,32位系統stack_fds一共64成員。 ret = -EINVAL; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; size = FDS_BYTES(n);------------------------------------------------------------------------一個文件描述符佔用一bit,size表示這些fd_set須要用掉多少個字節。 bits = stack_fds; if (size > sizeof(stack_fds) / 6) {---------------------------------------------------------若是stack空間足夠當前存放當前strcut fd_set_bits fds的時候,優先使用stack內存,好處是更加快而且節省內存。否則就須要kmalloc去申請內存。 /* Not enough space in on-stack array; must use kmalloc */------------------------------stack_fds大小爲256字節,因此要使用stack的最大size=256/6/sizeof(long)=10。 ret = -ENOMEM; if (size > (SIZE_MAX / 6)) goto out_nofds; alloc_size = 6 * size; bits = kmalloc(alloc_size, GFP_KERNEL|__GFP_NOWARN);-------------------------------------經過kmalloc來分配6個size大小的內存;大於一個頁面使用vmalloc。 if (!bits && alloc_size > PAGE_SIZE) bits = vmalloc(alloc_size); if (!bits) goto out_nofds; } fds.in = bits;--------------------------------------------------------------------------此時bits指向的內存都已經分配完畢,而且是6個一樣size大小的。 fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size; if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex);------------------------------------------------------------------將用戶空間傳入的fds拷貝到fds中,並清空res_in、res_out、res_ex。 ret = do_select(n, &fds, end_time); if (ret < 0) goto out; if (!ret) { ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex))----------------------------------------------------------res_int/res_out/res_ex中包含了狀態變化的文件句柄,拷貝到in/out/ex返回給用戶空間。 ret = -EFAULT; out: if (bits != stack_fds) kvfree(bits); out_nofds: return ret; } int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time) { ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; u64 slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;---------------------------這個參數影響下面一輪查詢全部文件描述符以後,才作busy loop仍是睡眠。這個很影響CPU佔用率,busy loop不會主動放棄CPU,睡眠則主動放棄CPU。 unsigned long busy_end = 0; rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; poll_initwait(&table);-----------------------------------------------------------------------將當前進程放入本身的等待隊列table,並將該等待隊列加入到測試表wait。 wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {-----------------------------------這也是timeout參數爲0時候的特殊狀況,直接timed_out置1。 wait->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) {--------------------------------------------遍歷全部的fd。 unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; in = *inp++; out = *outp++; ex = *exp++;---------------------------------------------先取出當前循環週期中的32個文件描述符對應的bitmaps。 all_bits = in | out | ex;------------------------------------------------------------in/out/ex三者組合,有的fd可能監測讀或寫或異常,或者都檢測。 if (all_bits == 0) {-----------------------------------------------------------------32個文件描述符不檢測任何狀態,調到下一個循環。 i += BITS_PER_LONG; continue; } for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits))-----------------------------------------------------------bit每次循環左移一位,若是沒有監測當前爲則跳過進入下一次循環。 continue; f = fdget(i); if (f.file) { const struct file_operations *f_op; f_op = f.file->f_op; mask = DEFAULT_POLLMASK; if (f_op->poll) { wait_key_set(wait, in, out, bit, busy_flag);-----------------------------------------------設置當前fd待檢測的事件掩碼。 mask = (*f_op->poll)(f.file, wait);-------------------------------------調用每一個文件句柄的poll成員,返回文件的狀態mask。下面分別檢查POLLIN_SET、POLLOUT_SET、POLLEX_SET三種狀態。 } fdput(f); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } /* got something, stop busy polling */ if (retval) { can_busy_loop = false; busy_flag = 0; /* * only remember a returned * POLL_BUSY_LOOP if we asked for it */ } else if (busy_flag & mask) can_busy_loop = true; } } if (res_in)--------------------------------------------------------------------------根據poll結果寫回到輸出位圖裏。 *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current))-------------------------------------三種返回狀況,retval表示有文件句柄知足條件;timed_out表示在有timeout參數的狀況下超時;signal_pending()表示被信號中斷。 break; if (table.error) { retval = table.error; break; } if (can_busy_loop && !need_resched()) {--------------------------------------------------這裏面會一直佔用CPU。 if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))-------------------------------------------------------------進程進入睡眠等待喚醒,若是poll_schedule_timeout()返回0表示超時,下面timed_out被置1。to是根據end_time計算的超時時間。 timed_out = 1; } poll_freewait(&table); return retval; }
回顧一下select()大體流程以下:
1.進入系統調用。
2.進行時間轉換;數據準備,分紅in、out、exception三部分。
3.對全部文件句柄進行循環,調用對應文件句柄的poll()函數;分別查詢是那種類型文件句柄有狀態變化。
4.若是一輪循環下來沒有變化,則進入休眠等待,直到超時。
5.若是有數據則喚醒進程,將變化句柄數返回給用戶空間。返回值是三種狀態的綜合,具體哪一種狀態哪一個文件句柄變化,須要查看分別查看返回的文件句柄。
select()測試程序和poll()使用一樣的內核驅動,區別就是設置文件句柄和API參數。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 int main(int argc, char **argv) { int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; fd_set rds; memset(polltest_count, 30, 0); fd = open(POLLTEST_NAME, O_RDWR); if (fd < 0) { printf("can't open!\n"); } FD_ZERO(&rds); FD_SET(fd, &rds);--------------------------------------------------------------將fd加入到rds句柄集中,在select()中對其進行監控。 while (1) { ret = select(fd+1, &rds, NULL, NULL, NULL); if (ret == 0) { printf("time out\n"); } else { if(FD_ISSET(fd, &rds))-------------------------------------------------判斷句柄fd是否有POLLIN事件發生。 { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) break; } } } return 0; }
一樣的從認識poll() API開始;而後詳細分析poll系統調用內核是如何實現的;最後對poll()測試程序流程結合內核進行詳細分析。
poll()提供一列文件描述符,在每一個文件描述符上標明感興趣的事件。
#include <poll.h> int poll(struct pollfd fds[], nfds_t nfds, int timeout);
nfds指定了fds中的元素個數;參數fds列出了須要poll()來檢查的文件描述符:
struct pollfd {
int fd; /* File descriptor */
short events; /* Requested events bit mask */-------指定須要爲描述符fd作檢查的事件。
short revents; /* Returned events bit mask */-------表示fd上實際發生的事件。 };
timeout參數的單位是毫秒。
-1,poll()會一直阻塞直到fds數組中列出的有一個達到就緒態或者捕獲到一個信號。
0,poll()不會阻塞,只是執行一次檢查看看哪一個文件描述符處於就緒態。
>0,poll()至多阻塞timeout毫秒,直到fds列出的文件描述符中有一個達到就緒態,或者捕獲到一個信號爲止。
-1標識有錯誤發生,一種多是被信號中斷返回EINTR。
0表示調用在任意一個文件描述符就緒以前就超時了。
正整數表示一個或多個文件描述符處於就緒態。返回值表示數組fds中擁有非零revents字段的pollfd結構體數量。
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds, int, timeout_msecs) { struct timespec64 end_time, *to = NULL; int ret; if (timeout_msecs >= 0) {-------------------------------------------從poll()參數timeout可知小於0表示poll()會一直阻塞;等於0表示只檢查一次;大於0表示等待一個超時時間。 to = &end_time; poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC, NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));------------根據timeout_msecs轉換成內核超時數據結構to。 } ret = do_sys_poll(ufds, nfds, to); if (ret == -EINTR) { struct restart_block *restart_block; restart_block = ¤t->restart_block; restart_block->fn = do_restart_poll; restart_block->poll.ufds = ufds; restart_block->poll.nfds = nfds; if (timeout_msecs >= 0) { restart_block->poll.tv_sec = end_time.tv_sec; restart_block->poll.tv_nsec = end_time.tv_nsec; restart_block->poll.has_timeout = 1; } else restart_block->poll.has_timeout = 0; ret = -ERESTART_RESTARTBLOCK; } return ret; } int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, struct timespec64 *end_time) { struct poll_wqueues table; int err = -EFAULT, fdcount, len, size; long stack_pps[POLL_STACK_ALLOC/sizeof(long)]; struct poll_list *const head = (struct poll_list *)stack_pps; struct poll_list *walk = head; unsigned long todo = nfds; if (nfds > rlimit(RLIMIT_NOFILE))-----------------------------------nfds不能超過系統對打開文件數目限制。 return -EINVAL; len = min_t(unsigned int, nfds, N_STACK_PPS);-----------------------取nfds和棧能容納的fds最小值。 for (;;) {----------------------------------------------------------遍歷全部的ufds[],分配對應的struct poll_list,而且連接起來;每一個struct poll_list裏面又包含了若干個struct pollfd,他的大小經過len肯定。 walk->next = NULL; walk->len = len; if (!len) break; if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len))-----------------首先將使用棧提供的空間,將用戶空間struct pollfd拷貝到內核空間。 goto out_fds; todo -= walk->len;----------------------------------------------假設len==nfds,todo也等於walk->len;因此此處退出for(;;)。 if (!todo) break; len = min(todo, POLLFD_PER_PAGE); size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; walk = walk->next = kmalloc(size, GFP_KERNEL);------------------若是佔空間不夠存放struct pollfd,那麼kmalloc()申請內存。 if (!walk) { err = -ENOMEM; goto out_fds; } } poll_initwait(&table);----------------------------------------------初始化一個struct poll_wqueues。 fdcount = do_poll(head, &table, end_time); poll_freewait(&table); for (walk = head; walk; walk = walk->next) { struct pollfd *fds = walk->entries; int j; for (j = 0; j < walk->len; j++, ufds++) if (__put_user(fds[j].revents, &ufds->revents))-------------將全部內核處理的revents返回給用戶空間。 goto out_fds; } err = fdcount; out_fds: walk = head->next; while (walk) { struct poll_list *pos = walk; walk = walk->next; kfree(pos);-----------------------------------------------------釋放申請的內存。 } return err; } static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time) { poll_table* pt = &wait->pt; ktime_t expire, *to = NULL; int timed_out = 0, count = 0; u64 slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;-----------------------決定下面是循環仍是睡眠等待超時。 unsigned long busy_end = 0; /* Optimise the no-wait case */ if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {------------------------------對timeout爲0特殊狀況的處理,只查詢一次就退出。 pt->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); for (;;) { struct poll_list *walk; bool can_busy_loop = false; for (walk = list; walk != NULL; walk = walk->next) {---------------------------------遍歷全部的struct poll_list。 struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } pt->_qproc = NULL; if (!count) { count = wait->error; if (signal_pending(current)) count = -EINTR; } if (count || timed_out)-----------------------------------------------------------------count是全部的有狀態變化的描述符數量;timed_out表示是否超時。 break; /* only if found POLL_BUSY_LOOP sockets && not out of time */ if (can_busy_loop && !need_resched()) { if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; } static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait, bool *can_busy_poll, unsigned int busy_flag) { unsigned int mask; int fd; mask = 0; fd = pollfd->fd;------------------------------------------------------------------------------根據struct pollfd找到對應的fd,而後調用所屬的poll()函數。 if (fd >= 0) { struct fd f = fdget(fd); mask = POLLNVAL; if (f.file) { mask = DEFAULT_POLLMASK; if (f.file->f_op->poll) { pwait->_key = pollfd->events|POLLERR|POLLHUP; pwait->_key |= busy_flag; mask = f.file->f_op->poll(f.file, pwait);----------------------------------------調用具體文件的poll()函數。 if (mask & busy_flag) *can_busy_poll = true; } /* Mask out unneeded events. */ mask &= pollfd->events | POLLERR | POLLHUP; fdput(f); } } pollfd->revents = mask; return mask; }
回顧一下,poll系統調用大體流程是:
1.進入poll系統調用。
2.進行timeout時間轉換;準備每一個文件句柄對應的數據,並將這些數據串聯起來。
3.對全部的struct pollfd進行循環,調用do_pollfd()查詢狀態。
4.do_pollfd()調用具體文件的file->f_op->poll()查詢狀態。
5.一次輪詢以後,將當前線程掛起等待超時獲喚醒。
6.有數據到達後退出循環,將數據返回給用戶空間。
首先建立polltest_kernel.c和Makefile文件。
#define DEBUG #include <linux/kernel.h> #include <linux/module.h> #include <linux/init.h> #include <linux/cdev.h> #include <linux/device.h> #include <linux/fs.h> #include <linux/uaccess.h> #include <linux/poll.h> #define POLL_EXPRIES 1 #define POLLTEST_NAME "polltest" static struct class *polltest_class; static int polltest_major; static struct hrtimer polltest_hrtimer; static volatile int ev_press = 0; int polltest_count = 0; static unsigned char polltest_text[30]; DECLARE_WAIT_QUEUE_HEAD(polltest_waitq); static enum hrtimer_restart hrtimer_func(struct hrtimer *timer) { wake_up_interruptible(&polltest_waitq);--------------------------------------------------調用隊列上全部等待項wait_queue_t->func()函數,這個函數主要用來喚醒等待的進程。 printk("%s line=%d\n", __func__, __LINE__); ev_press = 1;----------------------------------------------------------------------------只有ev_press置位,poll()纔會返回POLLIN狀態。其餘狀況就會阻塞。 polltest_count++; hrtimer_forward_now(&polltest_hrtimer, ktime_set(0, 10000000)); return HRTIMER_RESTART; } static int polltest_open(struct inode *inode, struct file *file) { hrtimer_init(&polltest_hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL); polltest_hrtimer.function = hrtimer_func; hrtimer_start(&polltest_hrtimer, ktime_set(0, 10000000), HRTIMER_MODE_REL); printk("%s line=%d\n", __func__, __LINE__); return 0; } static int polltest_release(struct inode *inode, struct file *file) { hrtimer_cancel(&polltest_hrtimer); printk("%s line=%d\n", __func__, __LINE__); return 0; } static ssize_t polltest_read(struct file * file, char __user * buf, size_t count, loff_t *ppos) { int size; size = snprintf(polltest_text, 30, "%d\n", polltest_count); copy_to_user(buf, polltest_text, size); return size; } static ssize_t polltest_write(struct file * file, const char __user * buf, size_t count, loff_t *ppos) { char str[30]; copy_from_user(str, buf,count); return 0; } static unsigned int polltest_poll(struct file *file, struct poll_table_struct *wait) { unsigned int mask = 0; poll_wait(file, &polltest_waitq, wait);-------------------------------------------------將當前struct poll_table_struct的wait加入到polltest_waitq隊列頭中。那麼這個wait合適被從polltest_waitq隊列頭移出呢?在poll_freewait()中。 printk("%s line=%d\n", __func__, __LINE__); if(ev_press)----------------------------------------------------------------------------在hrtimer超時以後,ev_press置位,纔會發送POLLIN狀態。
{
mask |= POLLIN | POLLRDNORM;--------------------------------------------------------若是返回POLLIN狀態,poll系統調用纔會返回;不然會進入睡眠狀態及poll_schedule_timeout()。
ev_press = 0;-----------------------------------------------------------------------恢復爲0,避免下次poll()返回POLLIN。
} return mask; } static const struct file_operations polltest_fops = { .owner = THIS_MODULE, .open = polltest_open, .release = polltest_release, .read = polltest_read, .write = polltest_write, .poll = polltest_poll, }; static int __init polltest_test_init(void) { struct device *polltest_device; polltest_major = register_chrdev(0, POLLTEST_NAME, &polltest_fops); if (polltest_major < 0) { pr_err("register_chrdev failed\n"); goto err; } polltest_class = class_create(THIS_MODULE, POLLTEST_NAME); if (IS_ERR(polltest_class)) { pr_err("device class file already in use\n"); goto err_class; } polltest_device = device_create(polltest_class, NULL, MKDEV(polltest_major, 0), NULL, "%s%d", POLLTEST_NAME, 0); if (IS_ERR(polltest_device)) { pr_err("failed to create device\n"); goto err_device; } return 0; err_device: class_destroy(polltest_class); err_class: unregister_chrdev(polltest_major, POLLTEST_NAME); err: return 0; } static void __exit polltest_test_exit(void) { device_destroy(polltest_class, MKDEV(polltest_major, 0)); class_destroy(polltest_class); unregister_chrdev(polltest_major, POLLTEST_NAME); } module_init(polltest_test_init); module_exit(polltest_test_exit); MODULE_LICENSE("GPL");
Makefile:
CONFIG_MODULE_SIG=n obj-m := polltest_kernel.o KERN_DIR := /lib/modules/$(shell uname -r)/build PWD := $(shell pwd) all: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules gcc polltest_user.c -o polltest_user gcc polltest_select.c -o polltest_select clean: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules clean rm polltest_user rm polltest_select modules_install: $(MAKE) -C $(KERN_DIR) M=$(PWD) modules_install
而後建立用戶空間測試程序:
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <poll.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 int main(int argc, char **argv) { int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; struct pollfd fds[1]; memset(polltest_count, 30, 0); fd = open(POLLTEST_NAME, O_RDWR);--------------------------------打開/dev/polltest0此時啓動一個10ms的hrtimer。 if (fd < 0) { printf("can't open!\n"); } fds[0].fd = fd; fds[0].events = POLLIN; while (1) { ret = poll(fds, 1, 5000);------------------------------------調用/dev/polltest0的poll()函數。若是返回狀態有POLLIN則ret表示知足狀態的句柄數,應該爲1;若是返回0表示超時。 if (ret == 0) { printf("time out\n"); } else { if(fds[0].revents == POLLIN) { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) break; } } } return 0; }
結果以下:
[ 2366.904469] polltest_open line=41---------------------------------打開/dev/polltest0,啓動10ms的hrtimer。 [ 2366.904475] polltest_poll line=78---------------------------------緊接着poll()系統調用,因爲ev_press爲0,沒有知足POLLIN條件;因此進程進入睡眠狀態。 [ 2366.914893] hrtimer_func line=29----------------------------------hrtimer 10ms超時,喚醒進程而且ev_press爲1。 [ 2366.914951] polltest_poll line=78---------------------------------進程被喚醒以後,從新調用/dev/polltest0的poll()檢查狀態;由於此時ev_press爲1,因此返回POLLIN。知足條件,poll()系統調用退出,用戶空間read()且置ev_press爲0。 [ 2366.915197] polltest_poll line=78---------------------------------再次調用poll()系統調用,此時ev_press爲0,因此進入睡眠狀態。 [ 2366.924575] hrtimer_func line=29 [ 2366.924584] polltest_poll line=78 [ 2366.924641] polltest_poll line=78 [ 2366.934632] hrtimer_func line=29 [ 2366.934683] polltest_poll line=78 [ 2366.934980] polltest_release line=49
從上面的分析能夠看出select()/poll()的內核實現其實大同小異,最終都是調用具體文見的poll()函數查詢狀態。
select()和poll()系統調用有以下幾個重要的結構體:struct poll_wqueues、struct poll_table_struct、struct poll_table_page、struct poll_table_entry,其中poll()還有兩個文件句柄相關結構體:struct poll_list、struct pollfd。
其中struct poll_wqueues是核心,用來統一輔助實現這個進程中全部監測fd的輪訓工做。
struct poll_list {-------------------------------------------------一次poll()的poll_list可能有多個,經過next連接起來。一個poll_list裏能夠有多個pollfd。 struct poll_list *next; int len;-------------------------------------------------------下面entries[]的數目。 struct pollfd entries[0]; }; struct pollfd {---------------------------------------------------和用戶空間一致的數據結構,表示一個poll句柄。 int fd; short events; short revents; }; struct poll_wqueues { poll_table pt; struct poll_table_page *table;----------------------------------指向poll_table_page類型頁面,多個頁面能夠互相連接起來。 struct task_struct *polling_task;-------------------------------保存當前調用select()/poll()的進程struct task_struct結構體。 int triggered;--------------------------------------------------當前用戶進程被喚醒後置位,以避免該進程接着睡眠。 int error;------------------------------------------------------錯誤碼。 int inline_index;-----------------------------------------------數組inline_entries[]的下標。 struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; };
struct poll_table_page {------------------------------------------申請的物理頁面都會將起始地址強制轉換成該結構體指針。
struct poll_table_page * next;---------------------------------指向下一個申請的物理頁面地址。
struct poll_table_entry * entry;-------------------------------指向entries[]中首個待分配poll_table_entry地址。
struct poll_table_entry entries[0];----------------------------該頁後面剩餘的空間都是待分配的poll_table_entry結構體。
};
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *); typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; struct poll_table_entry { struct file *filp;--------------------------------------------指向特定fd對應的file結構體。 unsigned long key;--------------------------------------------等待特定fd對應硬件設備的事件掩碼,如POLLIN、POLLOUT等。 wait_queue_t wait;--------------------------------------------表明調用select()/poll()的應用程序,等待在fd對應設備的特定時間的等待隊列頭上的等待隊列項。 wait_queue_head_t *wait_address;------------------------------設備驅動中特定時間的等待隊列頭。 };
一次select()/poll()調用對應一個strcut poll_wqueue結構體;struct poll_table_struct就是一個函數和一個key值。
一次select()/poll()可能包含一個或者多個struct poll_table_page,這個可能根據文件句柄的數量而變化。
一個fd對應一個struct poll_table_entry。
因此fd、struct poll_fd、strcut poll_table_entry是一一對應的;一次select()/poll()和struct poll_wqueues、strcut poll_table_strcut是一一對應的。
這兩個結構體都經過poll_initwait()初始化,而後在poll_wait()的時候調用poll_queue_proc函數。
poll_initwait()是select()/poll()中對相關結構體進行初始化的入口;poll_wait()是驅動中實現file_operations成員poll()函數的主要功能,它將這次調用轉換成一個wait_queue_t放入到驅動的等待隊列中。
而後wake_up_interruptible()是喚醒全部wait_queue_head_t上的等待項,這些等待項調用對應的func()函數;默認是用來喚醒default_wake_function()。
void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait);-----------------------------------------------------------------初始化struct poll_table,函數是__pollwait。在後面poll_wait()會被調用。 pwq->polling_task = current;-----------------------------------------------------------------------------polling_task指向當前進程結構體。 pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~0UL; /* all events enabled */ } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)-------------wait_address是驅動程序提供的等待隊列頭,來容納後需等待該硬件設備就緒的進程對應的等待隊列項。p是系統調用傳下來的struct poll_table_strcut。 { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p);-----------------------------------------------------------------------調用__pollwait()函數。 } /* Add a new entry */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq);-------------------------------------------------------首先從struct poll_wqueues->inline_entries[]中獲取poll_table_entry;若是不夠用則經過盛情一個頁面轉換成struct poll_table_page類型指針。 if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake);----------------------------------------------------------等待隊列項的操做函數指定爲pollwake(),這個函數做用就是喚醒polling_task對應的進程。 entry->wait.private = pwq;----------------------------------------------------------------------------------私有變量指向pwq,在__pollwake()中會使用到pwq->polling_task來喚醒對應進程。 add_wait_queue(wait_address, &entry->wait);-----------------------------------------------------------------將poll_table_entry對應的wait加入到驅動的wait_queue_head_t上。 }
static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p)
{
struct poll_table_page *table = p->table;
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
if (!table || POLL_TABLE_FULL(table)) {
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);--------------------------------------申請一個空閒頁面,轉換成struct poll_table_page類型指針;在poll_freewait()中被釋放。
if (!new_table) {
p->error = -ENOMEM;
return NULL;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
return table->entry++;
}
static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) { struct poll_table_entry *entry; entry = container_of(wait, struct poll_table_entry, wait);--------------------------------------------------根據wait找到struct poll_table_entry,進而得到關注的事件值key。 if (key && !((unsigned long)key & entry->key)) return 0; return __pollwake(wait, mode, sync, key); } static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) { struct poll_wqueues *pwq = wait->private;------------------------------------------------------------------在__pollwait()中設置變量,此處使用其polling_task成員。 DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);----------------------------------------------------------定義一個臨時的wait_queue_t,給下面的default_wake_functio作參數。 smp_wmb(); pwq->triggered = 1; return default_wake_function(&dummy_wait, mode, sync, key);------------------------------------------------喚醒pwq->polling_task對應的線程。 } int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key) { return try_to_wake_up(curr->private, mode, wake_flags); }
釋放poll_wqueues申請的內存,主要是struct poll_table_page對應的頁面。
poll_schedule_timeout()實現超時功能。
void poll_freewait(struct poll_wqueues *pwq) { struct poll_table_page * p = pwq->table; int i; for (i = 0; i < pwq->inline_index; i++) free_poll_entry(pwq->inline_entries + i);------------------------------------------這裏將poll_table_entry->wait從其所在隊列頭移出。在poll_wait()中被添加進去。 while (p) { struct poll_table_entry * entry; struct poll_table_page *old; entry = p->entry; do { entry--; free_poll_entry(entry); } while (entry > p->entries); old = p; p = p->next; free_page((unsigned long) old); } }
static void free_poll_entry(struct poll_table_entry *entry)
{
remove_wait_queue(entry->wait_address, &entry->wait);
fput(entry->filp);
}
int poll_schedule_timeout(struct poll_wqueues *pwq, int state, ktime_t *expires, unsigned long slack) { int rc = -EINTR; set_current_state(state); if (!pwq->triggered) rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);-------------------調用schedule_hrtimeout_range()來完成timeout功能。 __set_current_state(TASK_RUNNING); smp_store_mb(pwq->triggered, 0); return rc; }
wake_up_interruptible()用於執行等待隊列wake_queue_head_t上的全部wait_queue_t對應func函數,這裏對應的就是pollwake(),即喚醒對應的進程。
#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr_exclusive, 0, key); spin_unlock_irqrestore(&q->lock, flags); } static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next; list_for_each_entry_safe(curr, next, &q->task_list, task_list) {------------------------q->task_list是全部wait_queue_head_t上wait_queue_t對應的進程鏈表,遍歷鏈表上進程結構體,進而找到對應的wait_queue_t。而後調用wait_queue_t->func函數。 unsigned flags = curr->flags; if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)------------------------------ break; } }
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)------------------------------將wait_queue_t加入到wait_queue_head_t的時候經過將wait_queue_t->task_list加入到wait_queue_head_t->task_list鏈表上。
{
unsigned long flags;
wait->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, wait);
spin_unlock_irqrestore(&q->lock, flags);
}
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
list_add(&new->task_list, &head->task_list);
}
select()和poll()基於一樣的內核機制poll_wqueues,不一樣點在於select()區分讀寫異常句柄等。
select()和poll()使用了相同的內核poll例程集合,
當select()和poll()用來檢查大量文件描述符時,可能會遇到一些問題。
如上種種特性形成select()或poll()在性能延展性上要低於信號驅動IO和epoll()。
epoll API有如下優勢:
性能表現上,epoll通訊號驅動IO類似。但epoll有一些賽過信號驅動IO:
epoll API由如下三個系統調用組成:
epoll_create()建立一個epoll實例,返回表明該實例的文件描述符。
epoll_ctl()操做同epoll實例相關聯的興趣列表。經過epoll_ctl()能夠增長到新的描述符到列表中,將已有的文件描述符從該列表中移除,以及修改表明文件描述符上時間類型的掩碼。
epoll_wait()返回與epoll實例相關聯的就緒列表中的成員。
#include <sys/epoll.h> int epoll_create(int size);
epoll_create()建立了一個新的epoll實例,其對應的興趣列表初始化爲空。
參數size指定了想要經過epoll實例老檢查的文件描述符個數。Linux 2.6.8以來,size參數被忽略不用。
返回值表明新建立的epoll實例的文件描述符,這個文件描述符在其餘幾個epoll系統調用中用來表示epoll實例。
當文件描述符再也不須要時,能夠經過close()關閉。
struct epoll_event { uint32_t events; /* epoll events (bit mask) */ epoll_data_t data; /* User data */ }; typedef union epoll_data { void *ptr; /* Pointer to user-defined data */ int fd; /* File descriptor */ uint32_t u32; /* 32-bit integer */ uint64_t u64; /* 64-bit integer */ } epoll_data_t; #include <sys/epoll.h> int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
參數epfd是epoll_create()建立的文件描述符。
參數fd指明瞭要修改興趣列表中的哪個文件描述符的設定。
參數op用來指定須要執行的操做,能夠是:
參數ev爲文件描述符fd所作的設置以下:
每一個註冊到epoll實例上的文件描述符都佔用一小段不能被交換的內核交換空間。
內核提供了一個接口用來定義每一個用戶能夠註冊到epoll實例上的文件描述符總數,這個上限值能夠經過max_user_watches來修改,這個文件在/proc/sys/fs/epoll/max_user_watches。
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
參數evlist指向的結構體數組中返回的是有關就緒態文件描述符的信息。數組evlist的空間由調用者負責申請,所包含的元素個數在參數maxevents中指定。
數組evlist中,每一個元素返回的都是單個就緒態文件描述符的信息。events字段返回了在該描述符上已經發生的事件掩碼。data字段返回的是使用epoll_ctl()註冊感興趣的事件時在ev.data中所指定的值。
參數timeout用來肯定epoll_wait()的阻塞行爲:
調用成功後epoll_wait()返回數組evlist中的元素個數,若是在timeout超時間隔內沒有任何文件描述符處於就緒態,返回0.
出錯時返回-1,並在errno總設定錯誤碼以表示錯誤緣由。
調用epoll_ctl()時能夠在ev.events中指定位掩碼以及由epoll_wait()返回的evlist[].events中給出。
位掩碼 | epoll_ctl()輸入 | epoll_wait()返回 | 描述 |
EPOLLIN | √ | √ | 可讀取非高優先級的數據 |
EPOLLPRI | √ | √ | 可讀取高優先級的數據 |
EPOLLRDHUP | √ | √ | |
EPOLLOUT | √ | √ | 普通數據可寫 |
EPOLLET | √ | 採用邊緣觸發事件通知 | |
EPOLLONESHOT | √ | 在完成事件通知以後禁用檢查 | |
EPOLLERR | √ | 有錯誤發生 | |
EPOLLHUP | √ | 出現掛斷 |
一旦經過epoll_ctl()的EPOLL_CTL_ADD將文件描述符添加到epoll實例興趣列表中,它會保持激活狀態,直到顯式地經過EPOLL_CTL_DEL操做將其從列表中移除。
若是咱們但願在某個特定的文件描述符上只獲得一次通知,那麼能夠在ev.events中指定EPOLLONESHOT標誌。
epoll API測試程序內核部分能夠和select/poll共用,用戶空間測試程序以下。
#include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <memory.h> #include <unistd.h> #include <stdlib.h> #include <sys/epoll.h> #include <errno.h> #define POLLTEST_NAME "/dev/polltest0" #define LOOP_COUNT 3 #define MAX_EVENTS 5 void main(void) { int epfd, i; struct epoll_event ev; struct epoll_event evlist[MAX_EVENTS]; int fd, loop_count = 0; unsigned char polltest_count[30]; int ret; fd = open(POLLTEST_NAME, O_RDWR); if (fd < 0) { printf("%s() %s %s\n", __func__, strerror(errno), POLLTEST_NAME); return; } epfd = epoll_create(MAX_EVENTS);---------------------------------------------裏面的size其實已經不重要。 if(epfd == -1) { printf("%s() %s\n", __func__, strerror(errno)); return; } ev.data.fd = fd; ev.events = EPOLLIN; if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1)----------------------------增長監控fd文件句柄上EPOLLIN事件。 { printf("%s() %s\n", __func__, strerror(errno)); return; } while(1) { ret = epoll_wait(epfd, evlist, MAX_EVENTS, -1);--------------------------在epfd上等待,直到fd上有事件發生。 if (ret == 0) { printf("time out\n"); } else if(ret == -1) { printf("%s() %s\n", __func__, strerror(errno)); } else { for(i = 0; i < ret; i++) { if(evlist[i].events & EPOLLIN)-----------------------------------檢查事件類型,並從fd中讀取信息。 { read(fd, &polltest_count, 0); loop_count = atoi(polltest_count); printf("epoll key_val = %d\n", loop_count); if(loop_count >= LOOP_COUNT) return; } } } } }
struct eventpoll { spinlock_t lock; struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq;-----------------------------後面epoll_wait()使用的等待隊列頭。 /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait;----------------------file->poll()使用的等待隊列頭。 /* List of ready file descriptors */ struct list_head rdllist;-------------------------準備就緒的文件描述符鏈表,在epoll_wait()中返回給用戶空間。 /* RB tree root used to store monitored fd structs */ struct rb_root rbr; struct epitem *ovflist; struct wakeup_source *ws; struct user_struct *user; struct file *file; int visited; struct list_head visited_list_link; }; struct epitem { union { struct rb_node rbn; struct rcu_head rcu; }; /* List header used to link this structure to the eventpoll ready list */ struct list_head rdllink; struct epitem *next; struct epoll_filefd ffd; int nwait; struct list_head pwqlist; struct eventpoll *ep; struct list_head fllink; struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ struct epoll_event event; }; struct epoll_filefd { struct file *file; int fd; } __packed;
SYSCALL_DEFINE1(epoll_create1, int, flags) { int error, fd; struct eventpoll *ep = NULL; struct file *file; /* Check the EPOLL_* constant for consistency. */ BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC); if (flags & ~EPOLL_CLOEXEC) return -EINVAL; /* * Create the internal data structure ("struct eventpoll"). */ error = ep_alloc(&ep);----------------------------------------------分配一個struct eventpoll結構體,而且初始化。 if (error < 0) return error; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));-------------在當前進程中,尋找一個空閒的文件描述符並返回;同時對文件設置O_RDWR等標誌。 if (fd < 0) { error = fd; goto out_free_ep; } file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));-------------------------建立一個匿名inode文件。 if (IS_ERR(file)) { error = PTR_ERR(file); goto out_free_fd; } ep->file = file;----------------------------------------------------將struct eventpoll和struct file關聯起來。 fd_install(fd, file);-----------------------------------------------將fd和file在current->files->fdt中關聯起來。 return fd; out_free_fd: put_unused_fd(fd); out_free_ep: ep_free(ep); return error; } SYSCALL_DEFINE1(epoll_create, int, size) { if (size <= 0) return -EINVAL; return sys_epoll_create1(0); } static int ep_alloc(struct eventpoll **pep) { int error; struct user_struct *user; struct eventpoll *ep; user = get_current_user(); error = -ENOMEM; ep = kzalloc(sizeof(*ep), GFP_KERNEL);-----------------------分配一個struct eventpoll結構體。 if (unlikely(!ep)) goto free_uid; spin_lock_init(&ep->lock);-----------------------------------初始化自旋鎖、等待隊列、紅黑樹等。 mutex_init(&ep->mtx); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user; *pep = ep; return 0; free_uid: free_uid(user); return error; } int get_unused_fd_flags(unsigned flags) { return __alloc_fd(current->files, 0, rlimit(RLIMIT_NOFILE), flags);----------current->files表示當前進程打開文件相關信息,經過RLIMIT_NOFILE能夠獲取系統最大打開文件數目。 } int __alloc_fd(struct files_struct *files, unsigned start, unsigned end, unsigned flags)------------------------__alloc_fd()在start和end之間,尋找一個合適的fd返回。 { unsigned int fd; int error; struct fdtable *fdt; spin_lock(&files->file_lock); repeat: fdt = files_fdtable(files); fd = start; if (fd < files->next_fd) fd = files->next_fd; if (fd < fdt->max_fds) fd = find_next_fd(fdt, fd); error = -EMFILE; if (fd >= end) goto out; error = expand_files(files, fd); if (error < 0) goto out; if (error) goto repeat; if (start <= files->next_fd) files->next_fd = fd + 1; __set_open_fd(fd, fdt);-----------------------------------------------------將fd設置爲打開狀態。 if (flags & O_CLOEXEC) __set_close_on_exec(fd, fdt); else __clear_close_on_exec(fd, fdt); error = fd; ... out: spin_unlock(&files->file_lock); return error; }
系統調用epoll_create()就是進程在內核中建立了一個從epoll文件描述符到struct event結構的通道;首先調用ep_alloc()分配一個struct eventpoll數據結構,並初始化;而後尋找一個空閒的文件描述符,並建立一個匿名文件inode;最後將二者在進程task_struct結構體中關聯起來。
最終返回打開的文件描述符。
後面的epoll_ctl()和epoll_wait()均可以經過此文件描述符關聯到內核中的struct eventpoll結構體。
struct ep_pqueue { poll_table pt; struct epitem *epi; }; typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int full_check = 0; struct fd f, tf; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; struct eventpoll *tep = NULL; error = -EFAULT; if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event)))--------------------將用戶空間的event拷貝到內核的epds中。 goto error_return; error = -EBADF; f = fdget(epfd);-----------------------------------------------------------------根據epfd找到對應的struct file結構體。 if (!f.file) goto error_return; tf = fdget(fd);------------------------------------------------------------------獲取目標文件的struct file結構體。 if (!tf.file) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; if (!tf.file->f_op->poll) goto error_tgt_fput; /* Check if EPOLLWAKEUP is allowed */ if (ep_op_has_event(op)) ep_take_care_of_epollwakeup(&epds); error = -EINVAL; if (f.file == tf.file || !is_file_epoll(f.file)) goto error_tgt_fput; if (epds.events & EPOLLEXCLUSIVE) { if (op == EPOLL_CTL_MOD) goto error_tgt_fput; if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) || (epds.events & ~EPOLLEXCLUSIVE_OK_BITS))) goto error_tgt_fput; } ep = f.file->private_data; mutex_lock_nested(&ep->mtx, 0); if (op == EPOLL_CTL_ADD) { if (!list_empty(&f.file->f_ep_links) || is_file_epoll(tf.file)) { full_check = 1; mutex_unlock(&ep->mtx); mutex_lock(&epmutex); if (is_file_epoll(tf.file)) { error = -ELOOP; if (ep_loop_check(ep, tf.file) != 0) { clear_tfile_check_list(); goto error_tgt_fput; } } else list_add(&tf.file->f_tfile_llink, &tfile_check_list); mutex_lock_nested(&ep->mtx, 0); if (is_file_epoll(tf.file)) { tep = tf.file->private_data; mutex_lock_nested(&tep->mtx, 1); } } } epi = ep_find(ep, tf.file, fd);-------------------------------------------------在ep->rbr紅黑樹中查找file對應的文件,若是沒有找到返回NULL。 error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) {-----------------------------------------------------------------若是以前ep_find()沒有找到對應的epi,那麼此處插入到ep->rbr紅黑樹中。 epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tf.file, fd, full_check); } else error = -EEXIST; if (full_check) clear_tfile_check_list(); break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi);---------------------------------------------在ep_find()找到epi的狀況下,將其從ep->rbr中移除。 else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { if (!(epi->event.events & EPOLLEXCLUSIVE)) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds);----------------------------------修改epi的事件類型。 } } else error = -ENOENT; break; } if (tep != NULL) mutex_unlock(&tep->mtx); mutex_unlock(&ep->mtx); ... return error; } static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd, int full_check) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))-------------------------從slab中分配緩存struct epitem結構體空間。 return -ENOMEM; /* Item initialization follow here ... */ INIT_LIST_HEAD(&epi->rdllink);------------------------------------------------初始化epi數據結構。 INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; if (epi->event.events & EPOLLWAKEUP) { error = ep_create_wakeup_source(epi); if (error) goto error_create_wakeup_source; } else { RCU_INIT_POINTER(epi->ws, NULL); } epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);-----------------------------將epq.pt->_qproc指向ep_ptable_queue_proc()。設置後面poll()等待隊列被喚醒的時候,將要調用到的函數。 revents = ep_item_poll(epi, &epq.pt);-----------------------------------------調用epi對應文件的poll()函數,插入到poll()的等待隊列中。;返回的事件類型時用戶空間關心事件類型的交集。 error = -ENOMEM; if (epi->nwait < 0) goto error_unregister; spin_lock(&tfile->f_lock); list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock); ep_rbtree_insert(ep, epi);----------------------------------------------------將新的epi插入到ep->rbr的紅黑樹中。 ... if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; ... return error; } static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq;---------------------------------------------------------struct epoll_entry主要完成struct epitem和和epitem事件發生時的callback函數之間的關聯。 if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {---------申請一個struct epoll_entry結構體緩存。 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);----------------------初始化等待隊列函數的入口,也就是poll醒來時要調用的回調函數。 pwq->whead = whead; pwq->base = epi; if (epi->event.events & EPOLLEXCLUSIVE) add_wait_queue_exclusive(whead, &pwq->wait); else add_wait_queue(whead, &pwq->wait);---------------------------------------將pwq->wait加入到等待隊列中。 list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } } static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)---主要功能試在被監視的文件等待時間就緒時,將文件對應的epitem實例添加到就緒列表中,當用戶調用epoll_wait()時,內核會將就緒隊列中的時間報告給用戶。 { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; int ewake = 0; if ((unsigned long)key & POLLFREE) { ep_pwq_from_wait(wait)->whead = NULL; list_del_init(&wait->task_list); } spin_lock_irqsave(&ep->lock, flags); if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto out_unlock; if (key && !((unsigned long) key & epi->event.events)) goto out_unlock; if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; if (epi->ws) { __pm_stay_awake(ep->ws); } } goto out_unlock; } if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake_rcu(epi); } if (waitqueue_active(&ep->wq)) { if ((epi->event.events & EPOLLEXCLUSIVE) && !((unsigned long)key & POLLFREE)) { switch ((unsigned long)key & EPOLLINOUT_BITS) { case POLLIN: if (epi->event.events & POLLIN) ewake = 1; break; case POLLOUT: if (epi->event.events & POLLOUT) ewake = 1; break; case 0: ewake = 1; break; } } wake_up_locked(&ep->wq); } if (waitqueue_active(&ep->poll_wait)) pwake++; ... return 1; } static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event) { int pwake = 0; unsigned int revents; poll_table pt; init_poll_funcptr(&pt, NULL); epi->event.events = event->events; /* need barrier below */ epi->event.data = event->data; /* protected by mtx */ if (epi->event.events & EPOLLWAKEUP) { if (!ep_has_wakeup_source(epi)) ep_create_wakeup_source(epi); } else if (ep_has_wakeup_source(epi)) { ep_destroy_wakeup_source(epi); } smp_mb(); revents = ep_item_poll(epi, &pt); if (revents & event->events) { spin_lock_irq(&ep->lock); if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irq(&ep->lock); } /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; } static int ep_remove(struct eventpoll *ep, struct epitem *epi)-------------------------------------將epi從當前ep中移除。 { unsigned long flags; struct file *file = epi->ffd.file; ep_unregister_pollwait(ep, epi); spin_lock(&file->f_lock); list_del_rcu(&epi->fllink); spin_unlock(&file->f_lock); rb_erase(&epi->rbn, &ep->rbr); spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags); wakeup_source_unregister(ep_wakeup_source(epi)); call_rcu(&epi->rcu, epi_rcu_free); atomic_long_dec(&ep->user->epoll_watches); return 0; }
epoll_ctl() 函數首先就分配空間, 將結構從用戶空間複製到內核空間中, 在進行方法(op)判斷以前, 先採用ep_find函數進行查找, 以確保該數據已經設置好回調函數了, 而後使用fget函數獲取該epoll的匿名文件的文件描述符, 最後進行方法(op)判斷, 肯定是EPOLL_CTL_ADD, EPOLL_CTL_MOD仍是 EPOLL_CTL_DEL。
這裏主要講的是EPOLL_CTL_ADD, 因此當是選擇加入時, 就調用ep_insert函數, 將回調函數設置爲ep_ptable_queue_proc函數, 也就是將消息到達後, 須要自動啓動ep_ptable_proc函數, 進而調用ep_poll_callback函數, 該函數就是把來的消息所對應的結構和文件信息加入到就緒鏈表中, 以便以後調用 epoll_wait 能夠直接從就緒隊列鏈表中奪得就緒的文件. 也正是這樣, epoll的回調函數使epoll不用每次都輪詢遍歷數據, 而是自動喚醒回調, 更加的高效. 而且回調函數也只是在進程加入的時侯才設置, 並且只設置一次.
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) { int error; struct fd f; struct eventpoll *ep; if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)----------------------------------判斷maxevents合法性。 return -EINVAL; if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) return -EFAULT; f = fdget(epfd);------------------------------------------------------------------獲取epoll_create()函數建立的匿名文件描述符。 if (!f.file) return -EBADF; error = -EINVAL; if (!is_file_epoll(f.file))-------------------------------------------------------經過判斷f.file->f_op是否爲eventpoll_fops來肯定是否爲epoll文件。 goto error_fput; ep = f.file->private_data;--------------------------------------------------------私有數據指向struct eventpoll數據結構。 error = ep_poll(ep, events, maxevents, timeout);----------------------------------等待消息到來;沒有消息到來就阻塞本身。 error_fput: fdput(f); return error; } static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res = 0, eavail, timed_out = 0; unsigned long flags; u64 slack = 0; wait_queue_t wait; ktime_t expires, *to = NULL; if (timeout > 0) {----------------------------------------------------------------將用戶空間傳入的timeout事件轉換成內核超時時間。 struct timespec64 end_time = ep_set_mstimeout(timeout); slack = select_estimate_accuracy(&end_time); to = &expires; *to = timespec64_to_ktime(end_time); } else if (timeout == 0) { timed_out = 1; spin_lock_irqsave(&ep->lock, flags); goto check_events; } fetch_events: spin_lock_irqsave(&ep->lock, flags); if (!ep_events_available(ep)) { init_waitqueue_entry(&wait, current);-----------------------------------------初始化一個等待隊列入口,並將其添加到等待隊列上。 __add_wait_queue_exclusive(&ep->wq, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE);------------------------------------進程設置爲可中斷的。 if (ep_events_available(ep) || timed_out)---------------------------------退出的條件是是否有ready事件、是否超時、是否有信號中斷,三者任一則退出循環。 break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))---------------時間達到以前,讓出cpu調用其餘進程;超時後,從新經過中斷回調該進程。 timed_out = 1; spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); __set_current_state(TASK_RUNNING); } check_events: eavail = ep_events_available(ep); spin_unlock_irqrestore(&ep->lock, flags); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out)-----------------若是中間沒有被信號中斷,而且ep->rdlist不爲空,則調用ep_send_events()給用戶空間發送消息。 goto fetch_events; return res; } static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct ep_send_events_data esed; esed.maxevents = maxevents; esed.events = events; return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false); } static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent; struct wakeup_source *ws; poll_table pt; init_poll_funcptr(&pt, NULL); for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) {---------------------------從就緒隊列去除一個個epi,進行處理;並將相關事件返回給用戶空間。 epi = list_first_entry(head, struct epitem, rdllink);--------------------------取出第一個消息數據對應的struct epitem結構,而後從就緒列表上移除。 ws = ep_wakeup_source(epi); if (ws) { if (ws->active) __pm_stay_awake(ep->ws); __pm_relax(ws); } list_del_init(&epi->rdllink); revents = ep_item_poll(epi, &pt);----------------------------------------------調用epi對應文件描述符的poll()函數,返回值是。 if (revents) { if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) {--------------------------將revents和event.data發送到用戶空間。 list_add(&epi->rdllink, head); ep_pm_stay_awake(epi); return eventcnt ? eventcnt : -EFAULT; } eventcnt++;----------------------------------------------------------------返回給用戶空間的參數eventcnt遞增,uevent也遞增。 uevent++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } } return eventcnt; }
得到匿名文件的文件指針, 在經過調用ep_poll函數, 進行時間片的設置, 在時間片結束後就緒隊列爲空, 就退出等待; 若是時間設置的是負數, ep_poll函數會調用schedule_timeout, 執行進程調度, 當設置的時間片結束後又繼續回到ep_poll進程 查看就緒隊列是否爲空, 爲空的話就繼續cinching調度, 此時wait又變成阻塞態; 當就緒隊列準備好後, 就退出進程調度, 執行ep_send_events函數, 主要是爲了將就緒隊列的鏈表從內核空間發送給用戶空間。
ep_send_events函數, 先將就緒鏈表rdllist複製到另外一個新的鏈表, 從新將就緒鏈表清零, 而後程序調用__put_user將新鏈表的數據發送給用戶空間, 同時, 發送的個數吉拉路下來, 以便函數的返回 , 可是, 若是在發送的時候又有就緒信號到來, 就未來的就緒信號保存在ovflist鏈表中, 最後又從新數據拷貝給rdllist中, 再重複執行ep_send_events函數。
從對epoll的分析也能夠看出,爲何性能要優於select()/poll()。
參考文檔:
《select(poll)系統調用實現解析(一)》《select(poll)系統調用實現解析(二)》《select(poll)系統調用實現解析(三)》《epoll源碼分析(一)》《epoll源碼分析(二)》《epoll源碼分析(三)》《Linux epoll模型詳解及源碼分析》《epoll源碼實現分析[整理]》《Linux下的I/O複用與epoll詳解》《epoll源碼分析(全)》《epoll內核源碼詳解+本身總結的流程》