epoll

epoll爲何這麼快?當數據包到達時,socket是怎麼通知epoll的?
(PS:既然要看內核,那就只關心想知道的內容,不然可能會把本身繞暈了!)node

先看怎麼註冊監聽句柄的:網絡

long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
    struct file *file, *tfile;
    struct eventpoll *ep;

            // 從user space拷到kernel space
        if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event)))
            goto error_return;

    file = fget(epfd);
    tfile = fget(fd);
    ep = file->private_data;    // 這個file關聯epoll實例 

    switch (op) {
    case EPOLL_CTL_ADD:
        epds.events |= POLLERR | POLLHUP;
        error = ep_insert(ep, &epds, tfile, fd);    // 關鍵看插入操做
        break;
    case EPOLL_CTL_DEL: ...
    case EPOLL_CTL_MOD: ...
    }
    ...
}

static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
{
    ...
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);   // 等同於賦值:epq.pt.qproc=ep_ptable_queue_proc
    int revents = tfile->f_op->poll(tfile, &epq.pt);    // 掃一遍就緒事件,再回調ep_ptable_queue_proc掛監聽鉤子
    ep_rbtree_insert(ep, epi);      // 插入到rbtree裏
      ...
      // 檢查事件
      if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
          /* 怎麼可能會走這裏?纔剛insert就想dispatch到哪去?真正掛監聽的是在epoll_wait裏邊呀 */
          list_add_tail(&epi->rdllink, &ep->rdllist);
    
          if (waitqueue_active(&ep->wq))    // wq的初始化見ep_alloc(),插入見ep_poll()
              __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
          if (waitqueue_active(&ep->poll_wait))
              pwake++;
      }
    ...
}

那個tfile->f_op->poll很關鍵,它是哪來的?這得看sys_socket了,建立待監聽的socket的時候初始化的(這裏咱們只關注socket,不關注普通的文件)。這裏略過socket建立時的前因後果,它其實指向sock_poll函數。app

unsigned int sock_poll(struct file *file, poll_table *wait)
{
    struct socket *sock = file->private_data;
    return sock->ops->poll(file, sock, wait);   // 關鍵是ops
}

// 傳輸層的socket結構
struct socket {
    const struct proto_ops  *ops;   // 就是這個
    struct file     *file;
    struct sock     *sk;
    ...
}

如今來關注ops是啥,下面是常見的兩種socket的ops:socket

struct inet_protosw inetsw_array[] =
{
    {
        .type =       SOCK_STREAM,
        .protocol =   IPPROTO_TCP,
        .prot =       &tcp_prot,
        .ops =        &inet_stream_ops, // TCP關注這裏
        .capability = -1,
        .no_check =   0,
        .flags =      INET_PROTOSW_PERMANENT | INET_PROTOSW_ICSK,
    },
    {
        .type =       SOCK_DGRAM,
        .protocol =   IPPROTO_UDP,
        .prot =       &udp_prot,
        .ops =        &inet_dgram_ops,  // UDP關注這裏
        .capability = -1,
        .no_check =   UDP_CSUM_DEFAULT,
        .flags =      INET_PROTOSW_PERMANENT,
    },
    ...
};

拿udp舉例:async

struct proto_ops inet_dgram_ops = {
    .family        = PF_INET,
    .owner         = THIS_MODULE,
    .release       = inet_release,
    .bind          = inet_bind,
    .connect       = inet_dgram_connect,
    .socketpair    = sock_no_socketpair,
    .accept        = sock_no_accept,
    .getname       = inet_getname,
    .poll          = udp_poll,      // 關鍵
    .ioctl         = inet_ioctl,
    .listen        = sock_no_listen,
    .shutdown      = inet_shutdown,
    .setsockopt    = sock_common_setsockopt,
    .getsockopt    = sock_common_getsockopt,
    .sendmsg       = inet_sendmsg,
    .recvmsg       = sock_common_recvmsg,
    .mmap          = sock_no_mmap,
    .sendpage      = inet_sendpage,
    .compat_setsockopt = compat_sock_common_setsockopt,
    .compat_getsockopt = compat_sock_common_getsockopt,
};

unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    unsigned int mask = datagram_poll(file, sock, wait);    // 關鍵
    ...
}

// 上面提到的sock->ops->poll就是這個函數
// file是關聯待監聽fd的,sock是待監聽socket的,wait是新建的關聯ep_ptable_queue_proc
unsigned int datagram_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    struct sock *sk = sock->sk;
    unsigned int mask;

    poll_wait(file, sk->sk_sleep, wait);    // 關鍵
    mask = 0;

    /* exceptional events? */
    if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
        mask |= POLLERR;
    if (sk->sk_shutdown & RCV_SHUTDOWN)
        mask |= POLLRDHUP;
    if (sk->sk_shutdown == SHUTDOWN_MASK)
        mask |= POLLHUP;

    /* readable? */
    if (!skb_queue_empty(&sk->sk_receive_queue) ||
        (sk->sk_shutdown & RCV_SHUTDOWN))
        mask |= POLLIN | POLLRDNORM;

    /* Connection-based need to check for termination and startup */
    if (connection_based(sk)) {
        if (sk->sk_state == TCP_CLOSE)
            mask |= POLLHUP;
        /* connection hasn't started yet? */
        if (sk->sk_state == TCP_SYN_SENT)
            return mask;
    }

    /* writable? */
    if (sock_writeable(sk))
        mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
    else
        set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);

    return mask;
}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && wait_address)
        p->qproc(filp, wait_address, p);    // qproc其實就是 ep_ptable_queue_proc
}

// file是關聯待監聽fd
// whead是待監聽網絡層sk->sk_sleep
// pt關聯ep_ptable_queue_proc的結構體
void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);  // epi在ep_insert出現過,表示一個待監聽的fd
    struct eppoll_entry *pwq;   // 這個玩意準備掛在epi上面

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);    // ep_poll_callback是回調
        pwq->whead = whead;
        pwq->base = epi;
        add_wait_queue(whead, &pwq->wait);  // whead插到pwq->wait鏈表中
        list_add_tail(&pwq->llink, &epi->pwqlist);  // pwq->llink插到epi->pwqlist鏈表中
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

等待事件發生

long sys_epoll_wait(int epfd, struct epoll_event __user *events,
                   int maxevents, int timeout)
{
    ...
    error = ep_poll(ep, events, maxevents, timeout);
    return error;
}


int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;

    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
        MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

retry:
    spin_lock_irqsave(&ep->lock, flags);

    res = 0;
    if (list_empty(&ep->rdllist)) {
        init_waitqueue_entry(&wait, current);   // 當前進程關聯wait
        wait.flags |= WQ_FLAG_EXCLUSIVE;
        __add_wait_queue(&ep->wq, &wait);   // 加入ep的等待隊列,有事件發生就會notify本進程

        for (;;) {  // 運行期間無限循環
            set_current_state(TASK_INTERRUPTIBLE);  // 讓當前進程隨時可被打斷
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            spin_unlock_irqrestore(&ep->lock, flags);
            jtimeout = schedule_timeout(jtimeout); // 調度,睡眠
            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);  // 移除等待隊列

        set_current_state(TASK_RUNNING);    // 置本進程狀態爲running
    }


    eavail = !list_empty(&ep->rdllist);
    spin_unlock_irqrestore(&ep->lock, flags);
    if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
        goto retry;
    return res;
}

因爲設置了TASK_INTERRUPTIBLE狀態,schedule_timeout(jtimeout)可能還沒睡夠jtimeout就返回,好比接收到信號。等到返回時,狀態自動被切換到TASK_RUNNING。tcp

建立socket流程

socket是怎麼建立的?函數

// 系統調用入口
long sys_socket(int family, int type, int protocol)
{
    ...
    retval = sock_create(family, type, protocol, &sock);
    return sock_map_fd(sock);   // 將socket映射成fd
}

int sock_create(int family, int type, int protocol, struct socket **res)
{
    return __sock_create(current->nsproxy->net_ns, family, type, protocol, res, 0);
}

// 系統調用sys_socket是這樣建立
int __sock_create(struct net *net, int family, int type, int protocol, struct socket **res, int kern)
{
    ...
    struct socket *sock = sock_alloc(); // 這是傳輸層的!
    pf = rcu_dereference(net_families[family]); // net_families是全局變量,供其餘模塊註冊
    err = pf->create(net, sock, protocol);  // 建立對應IP層的sock,見下面分析
    ...
}

對於AF_INET,pf就是下面這樣來的。spa

// families註冊接口
int sock_register(const struct net_proto_family *ops)
{
    if (net_families[ops->family])
        err = -EEXIST;
    else {
        net_families[ops->family] = ops;
        err = 0;
    }
}

int inet_init(void)
{
    ...
    sock_register(&inet_family_ops);    // 註冊
    ...
}

struct net_proto_family inet_family_ops = {
    .family = PF_INET,
    .create = inet_create,      // 就是那個pf->create
    .owner  = THIS_MODULE,
};

IP層的socket纔是關鍵,底層數據包到達終端時是先到達IP層的:pwa

// Create an inet socket.(這是IP層的socket)
int inet_create(struct net *net, struct socket *sock, int protocol)
{
    ...
    struct sock *sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot);  // 申請sk
    sock_init_data(sock, sk);   // 初始化sk和sock之間關聯部分
    ...
}

void sock_init_data(struct socket *sock, struct sock *sk)
{
    ...
    if (sock) {
        sk->sk_type = sock->type;
        sk->sk_sleep = &sock->wait;
        sock->sk = sk;  // sk掛在sock上
    } else
        sk->sk_sleep = NULL;
    ...
}

將socket映射成文件:有些重要的東西是放在文件裏的,需要關注下。rest

int sock_map_fd(struct socket *sock)
{
    struct file *newfile;
    int fd = sock_alloc_fd(&newfile);
    int err = sock_attach_fd(sock, newfile);    // 關鍵
    fd_install(fd, newfile);
    ...
}

static int sock_attach_fd(struct socket *sock, struct file *file)
{
    ...
    sock->file = file;
    init_file(file, sock_mnt, dentry, FMODE_READ | FMODE_WRITE, &socket_file_ops);  // 關鍵
    file->private_data = sock;
    ...
}

int init_file(struct file *file, struct vfsmount *mnt, struct dentry *dentry,
   mode_t mode, const struct file_operations *fop)
{
    int error = 0;
    file->f_path.dentry = dentry;
    file->f_path.mnt = mntget(mnt);
    file->f_mapping = dentry->d_inode->i_mapping;
    file->f_mode = mode;
    file->f_op = fop;       // 關鍵
    return error;
}

const struct file_operations socket_file_ops = {
    .owner =    THIS_MODULE,
    .llseek =   no_llseek,
    .aio_read = sock_aio_read,
    .aio_write =    sock_aio_write,
    .poll =     sock_poll,          // 關鍵
    .unlocked_ioctl = sock_ioctl,
    .compat_ioctl = compat_sock_ioctl,
    .mmap =     sock_mmap,
    .open =     sock_no_open,
    .release =  sock_close,
    .fasync =   sock_fasync,
    .sendpage = sock_sendpage,
    .splice_write = generic_splice_sendpage,
};

怎麼通知阻塞的進程的?

相關文章
相關標籤/搜索