epoll爲何這麼快?當數據包到達時,socket是怎麼通知epoll的?
(PS:既然要看內核,那就只關心想知道的內容,不然可能會把本身繞暈了!)node
先看怎麼註冊監聽句柄的:網絡
long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event) { struct file *file, *tfile; struct eventpoll *ep; // 從user space拷到kernel space if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto error_return; file = fget(epfd); tfile = fget(fd); ep = file->private_data; // 這個file關聯epoll實例 switch (op) { case EPOLL_CTL_ADD: epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); // 關鍵看插入操做 break; case EPOLL_CTL_DEL: ... case EPOLL_CTL_MOD: ... } ... } static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) { ... init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); // 等同於賦值:epq.pt.qproc=ep_ptable_queue_proc int revents = tfile->f_op->poll(tfile, &epq.pt); // 掃一遍就緒事件,再回調ep_ptable_queue_proc掛監聽鉤子 ep_rbtree_insert(ep, epi); // 插入到rbtree裏 ... // 檢查事件 if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { /* 怎麼可能會走這裏?纔剛insert就想dispatch到哪去?真正掛監聽的是在epoll_wait裏邊呀 */ list_add_tail(&epi->rdllink, &ep->rdllist); if (waitqueue_active(&ep->wq)) // wq的初始化見ep_alloc(),插入見ep_poll() __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE); if (waitqueue_active(&ep->poll_wait)) pwake++; } ... }
那個tfile->f_op->poll
很關鍵,它是哪來的?這得看sys_socket了,建立待監聽的socket的時候初始化的(這裏咱們只關注socket,不關注普通的文件)。這裏略過socket建立時的前因後果,它其實指向sock_poll函數。app
unsigned int sock_poll(struct file *file, poll_table *wait) { struct socket *sock = file->private_data; return sock->ops->poll(file, sock, wait); // 關鍵是ops } // 傳輸層的socket結構 struct socket { const struct proto_ops *ops; // 就是這個 struct file *file; struct sock *sk; ... }
如今來關注ops是啥,下面是常見的兩種socket的ops:socket
struct inet_protosw inetsw_array[] = { { .type = SOCK_STREAM, .protocol = IPPROTO_TCP, .prot = &tcp_prot, .ops = &inet_stream_ops, // TCP關注這裏 .capability = -1, .no_check = 0, .flags = INET_PROTOSW_PERMANENT | INET_PROTOSW_ICSK, }, { .type = SOCK_DGRAM, .protocol = IPPROTO_UDP, .prot = &udp_prot, .ops = &inet_dgram_ops, // UDP關注這裏 .capability = -1, .no_check = UDP_CSUM_DEFAULT, .flags = INET_PROTOSW_PERMANENT, }, ... };
拿udp舉例:async
struct proto_ops inet_dgram_ops = { .family = PF_INET, .owner = THIS_MODULE, .release = inet_release, .bind = inet_bind, .connect = inet_dgram_connect, .socketpair = sock_no_socketpair, .accept = sock_no_accept, .getname = inet_getname, .poll = udp_poll, // 關鍵 .ioctl = inet_ioctl, .listen = sock_no_listen, .shutdown = inet_shutdown, .setsockopt = sock_common_setsockopt, .getsockopt = sock_common_getsockopt, .sendmsg = inet_sendmsg, .recvmsg = sock_common_recvmsg, .mmap = sock_no_mmap, .sendpage = inet_sendpage, .compat_setsockopt = compat_sock_common_setsockopt, .compat_getsockopt = compat_sock_common_getsockopt, }; unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait) { unsigned int mask = datagram_poll(file, sock, wait); // 關鍵 ... } // 上面提到的sock->ops->poll就是這個函數 // file是關聯待監聽fd的,sock是待監聽socket的,wait是新建的關聯ep_ptable_queue_proc unsigned int datagram_poll(struct file *file, struct socket *sock, poll_table *wait) { struct sock *sk = sock->sk; unsigned int mask; poll_wait(file, sk->sk_sleep, wait); // 關鍵 mask = 0; /* exceptional events? */ if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue)) mask |= POLLERR; if (sk->sk_shutdown & RCV_SHUTDOWN) mask |= POLLRDHUP; if (sk->sk_shutdown == SHUTDOWN_MASK) mask |= POLLHUP; /* readable? */ if (!skb_queue_empty(&sk->sk_receive_queue) || (sk->sk_shutdown & RCV_SHUTDOWN)) mask |= POLLIN | POLLRDNORM; /* Connection-based need to check for termination and startup */ if (connection_based(sk)) { if (sk->sk_state == TCP_CLOSE) mask |= POLLHUP; /* connection hasn't started yet? */ if (sk->sk_state == TCP_SYN_SENT) return mask; } /* writable? */ if (sock_writeable(sk)) mask |= POLLOUT | POLLWRNORM | POLLWRBAND; else set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); return mask; } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && wait_address) p->qproc(filp, wait_address, p); // qproc其實就是 ep_ptable_queue_proc } // file是關聯待監聽fd // whead是待監聽網絡層sk->sk_sleep // pt關聯ep_ptable_queue_proc的結構體 void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); // epi在ep_insert出現過,表示一個待監聽的fd struct eppoll_entry *pwq; // 這個玩意準備掛在epi上面 if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // ep_poll_callback是回調 pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); // whead插到pwq->wait鏈表中 list_add_tail(&pwq->llink, &epi->pwqlist); // pwq->llink插到epi->pwqlist鏈表中 epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } }
long sys_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout) { ... error = ep_poll(ep, events, maxevents, timeout); return error; } int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res, eavail; unsigned long flags; long jtimeout; wait_queue_t wait; jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ? MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000; retry: spin_lock_irqsave(&ep->lock, flags); res = 0; if (list_empty(&ep->rdllist)) { init_waitqueue_entry(&wait, current); // 當前進程關聯wait wait.flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq, &wait); // 加入ep的等待隊列,有事件發生就會notify本進程 for (;;) { // 運行期間無限循環 set_current_state(TASK_INTERRUPTIBLE); // 讓當前進程隨時可被打斷 if (!list_empty(&ep->rdllist) || !jtimeout) break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); // 調度,睡眠 spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); // 移除等待隊列 set_current_state(TASK_RUNNING); // 置本進程狀態爲running } eavail = !list_empty(&ep->rdllist); spin_unlock_irqrestore(&ep->lock, flags); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout) goto retry; return res; }
因爲設置了TASK_INTERRUPTIBLE狀態,schedule_timeout(jtimeout)可能還沒睡夠jtimeout就返回,好比接收到信號。等到返回時,狀態自動被切換到TASK_RUNNING。tcp
socket是怎麼建立的?函數
// 系統調用入口 long sys_socket(int family, int type, int protocol) { ... retval = sock_create(family, type, protocol, &sock); return sock_map_fd(sock); // 將socket映射成fd } int sock_create(int family, int type, int protocol, struct socket **res) { return __sock_create(current->nsproxy->net_ns, family, type, protocol, res, 0); } // 系統調用sys_socket是這樣建立 int __sock_create(struct net *net, int family, int type, int protocol, struct socket **res, int kern) { ... struct socket *sock = sock_alloc(); // 這是傳輸層的! pf = rcu_dereference(net_families[family]); // net_families是全局變量,供其餘模塊註冊 err = pf->create(net, sock, protocol); // 建立對應IP層的sock,見下面分析 ... }
對於AF_INET,pf就是下面這樣來的。spa
// families註冊接口 int sock_register(const struct net_proto_family *ops) { if (net_families[ops->family]) err = -EEXIST; else { net_families[ops->family] = ops; err = 0; } } int inet_init(void) { ... sock_register(&inet_family_ops); // 註冊 ... } struct net_proto_family inet_family_ops = { .family = PF_INET, .create = inet_create, // 就是那個pf->create .owner = THIS_MODULE, };
IP層的socket纔是關鍵,底層數據包到達終端時是先到達IP層的:pwa
// Create an inet socket.(這是IP層的socket) int inet_create(struct net *net, struct socket *sock, int protocol) { ... struct sock *sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot); // 申請sk sock_init_data(sock, sk); // 初始化sk和sock之間關聯部分 ... } void sock_init_data(struct socket *sock, struct sock *sk) { ... if (sock) { sk->sk_type = sock->type; sk->sk_sleep = &sock->wait; sock->sk = sk; // sk掛在sock上 } else sk->sk_sleep = NULL; ... }
將socket映射成文件:有些重要的東西是放在文件裏的,需要關注下。rest
int sock_map_fd(struct socket *sock) { struct file *newfile; int fd = sock_alloc_fd(&newfile); int err = sock_attach_fd(sock, newfile); // 關鍵 fd_install(fd, newfile); ... } static int sock_attach_fd(struct socket *sock, struct file *file) { ... sock->file = file; init_file(file, sock_mnt, dentry, FMODE_READ | FMODE_WRITE, &socket_file_ops); // 關鍵 file->private_data = sock; ... } int init_file(struct file *file, struct vfsmount *mnt, struct dentry *dentry, mode_t mode, const struct file_operations *fop) { int error = 0; file->f_path.dentry = dentry; file->f_path.mnt = mntget(mnt); file->f_mapping = dentry->d_inode->i_mapping; file->f_mode = mode; file->f_op = fop; // 關鍵 return error; } const struct file_operations socket_file_ops = { .owner = THIS_MODULE, .llseek = no_llseek, .aio_read = sock_aio_read, .aio_write = sock_aio_write, .poll = sock_poll, // 關鍵 .unlocked_ioctl = sock_ioctl, .compat_ioctl = compat_sock_ioctl, .mmap = sock_mmap, .open = sock_no_open, .release = sock_close, .fasync = sock_fasync, .sendpage = sock_sendpage, .splice_write = generic_splice_sendpage, };