從linux源碼看epoll


前言

在linux的高性能網絡編程中,繞不開的就是epoll。和select、poll等系統調用相比,epoll在須要監視大量文件描述符而且其中只有少數活躍的時候,表現出無可比擬的優點。epoll能讓內核記住所關注的描述符,並在對應的描述符事件就緒的時候,在epoll的就緒鏈表中添加這些就緒元素,並喚醒對應的epoll等待進程。
本文就是筆者在探究epoll源碼過程當中,對kernel將就緒描述符添加到epoll並喚醒對應進程的一次源碼分析(基於linux-2.6.32內核版本)。因爲篇幅所限,筆者聚焦於tcp協議下socket可讀事件的源碼分析。node

簡單的epoll例子

下面的例子,是從筆者本人用c語言寫的dbproxy中的一段代碼。因爲細節過多,因此作了一些刪減。react

int init_reactor(int listen_fd,int worker_count){
	......	// 建立多個epoll fd,以充分利用多核
	for(i=0;i<worker_count;i++){
		reactor->worker_fd = epoll_create(EPOLL_MAX_EVENTS);
	}	/* epoll add listen_fd and accept */
	// 將accept後的事件加入到對應的epoll fd中
	int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));	// 將鏈接描述符註冊到對應的worker裏面
	epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
}// reactor的worker線程static void* rw_thread_func(void* arg){
	......	for(;;){		  // epoll_wait等待事件觸發
        int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500);        if(retval > 0){        	for(j=0; j < retval; j++){        		// 處理讀事件
        	   if(event & EPOLLIN){
                 handle_ready_read_connection(conn);                 continue;
             }             /* 處理其它事件 */
        	}
        }
	}
	......
}

上述代碼事實上就是實現了一個reactor模式中的accept與read/write處理線程,以下圖所示:linux

epoll_create

Unix的萬物皆文件的思想在epoll裏面也有體現,epoll_create調用返回一個文件描述符,此描述符掛載在anon_inode_fs(匿名inode文件系統)的根目錄下面。讓咱們看下具體的epoll_create系統調用源碼:編程

SYSCALL_DEFINE1(epoll_create, int, size){	if (size <= 0)		return -EINVAL;	return sys_epoll_create1(0);
}

由上述源碼可見,epoll_create的參數是基本沒有意義的,kernel簡單的判斷是否爲0,而後就直接就調用了sys_epoll_create1。因爲linux的系統調用是經過(SYSCALL_DEFINE1,SYSCALL_DEFINE2......SYSCALL_DEFINE6)定義的,那麼sys_epoll_create1對應的源碼便是SYSCALL_DEFINE(epoll_create1)。
(注:受限於寄存器數量的限制,(80x86下的)kernel限制系統調用最多有6個參數。據ulk3所述,這是因爲32位80x86寄存器的限制)
接下來,咱們就看下epoll_create1的源碼:數組

SYSCALL_DEFINE1(epoll_create1, int, flags){	// kzalloc(sizeof(*ep), GFP_KERNEL),用的是內核空間
	error = ep_alloc(&ep);	// 獲取還沒有被使用的文件描述符,即描述符數組的槽位
	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));	// 在匿名inode文件系統中分配一個inode,並獲得其file結構體
	// 且file->f_op = &eventpoll_fops
	// 且file->private_data = ep;
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));	// 將file填入到對應的文件描述符數組的槽裏面
	fd_install(fd,file);			 
	ep->file = file;	return fd;
}

最後epoll_create生成的文件描述符以下圖所示:
網絡

struct eventpoll

全部的epoll系統調用都是圍繞eventpoll結構體作操做,現簡要描述下其中的成員:數據結構

/*
 * 此結構體存儲在file->private_data中
 */struct eventpoll {
	// 自旋鎖,在kernel內部用自旋鎖加鎖,就能夠同時多線(進)程對此結構體進行操做
	// 主要是保護ready_list
	spinlock_t lock;	// 這個互斥鎖是爲了保證在eventloop使用對應的文件描述符的時候,文件描述符不會被移除掉
	struct mutex mtx;
	// epoll_wait使用的等待隊列,和進程喚醒有關
	wait_queue_head_t wq;	// file->poll使用的等待隊列,和進程喚醒有關
	wait_queue_head_t poll_wait;	// 就緒的描述符隊列
	struct list_head rdllist;
	// 經過紅黑樹來組織當前epoll關注的文件描述符
	struct rb_root rbr;
	// 在向用戶空間傳輸就緒事件的時候,將同時發生事件的文件描述符鏈入到這個鏈表裏面
	struct epitem *ovflist;
	// 對應的user
	struct user_struct *user;
	// 對應的文件描述符
	struct file *file;
	// 下面兩個是用於環路檢測的優化
	int visited;	struct list_head visited_list_link;};

本文講述的是kernel是如何將就緒事件傳遞給epoll並喚醒對應進程上,所以在這裏主要聚焦於(wait_queue_head_t wq)等成員。架構

epoll_ctl(add)

咱們看下epoll_ctl(EPOLL_CTL_ADD)是如何將對應的文件描述符插入到eventpoll中的。
藉助於spin_lock(自旋鎖)和mutex(互斥鎖),epoll_ctl調用能夠在多個KSE(內核調度實體,即進程/線程)中併發執行。併發

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,		struct epoll_event __user *, event)
{	/* 校驗epfd是不是epoll的描述符 */
	// 此處的互斥鎖是爲了防止併發調用epoll_ctl,即保護內部數據結構
	// 不會被併發的添加修改刪除破壞
	mutex_lock_nested(&ep->mtx, 0);	switch (op) {		case EPOLL_CTL_ADD:
			...			// 插入到紅黑樹中
			error = ep_insert(ep, &epds, tfile, fd);
			...			break;
		......
	}
	mutex_unlock(&ep->mtx);	
}		

上述過程以下圖所示:
socket

ep_insert

在ep_insert中初始化了epitem,而後初始化了本文關注的焦點,即事件就緒時候的回調函數,代碼以下所示:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,		     struct file *tfile, int fd)
{	/* 初始化epitem */
	// &epq.pt->qproc = ep_ptable_queue_proc
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);	// 在這裏將回調函數注入
	revents = tfile->f_op->poll(tfile, &epq.pt);	// 若是當前有事件已經就緒,那麼一開始就會被加入到ready list
	// 例如可寫事件
	// 另外,在tcp內部ack以後調用tcp_check_space,最終調用sock_def_write_space來喚醒對應的epoll_wait下的進程
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);		// wake_up ep對應在epoll_wait下的進程
		if (waitqueue_active(&ep->wq)){
			wake_up_locked(&ep->wq);
		}
		......
	}	
	// 將epitem插入紅黑樹
	ep_rbtree_insert(ep, epi);
	......
}

tfile->f_op->poll的實現

向kernel更底層註冊回調函數的是tfile->f_op->poll(tfile, &epq.pt)這一句,咱們來看一下對於對應的socket文件描述符,其fd=>file->f_op->poll的初始化過程:

    // 將accept後的事件加入到對應的epoll fd中
    int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));    // 將鏈接描述符註冊到對應的worker裏面
    epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);

回顧一下上述user space代碼,fd即client_fd是由tcp的listen_fd經過accept調用而來,那麼咱們看下accept調用鏈的關鍵路徑:

accept
      |->accept4
            |->sock_attach_fd(newsock, newfile, flags & O_NONBLOCK);
                  |->init_file(file,...,&socket_file_ops);
                        |->file->f_op = fop;                              /* file->f_op = &socket_file_ops */
            |->fd_install(newfd, newfile); // 安裝fd

那麼,由accept得到的client_fd的結構以下圖所示:

(注:因爲是tcp socket,因此這邊sock->ops=inet_stream_ops,這個初始化的過程在個人另外一篇博客<<從linux源碼看socket的阻塞和非阻塞>>中,博客地址以下:
https://my.oschina.net/alchemystar/blog/1791017)
既然知道了tfile->f_op->poll的實現,咱們就能夠看下此poll是如何將安裝回調函數的。

回調函數的安裝

kernel的調用路徑以下:

sock_poll /*tfile->f_op->poll(tfile, &epq.pt)*/;
	|->sock->ops->poll
		|->tcp_poll			/* 這邊重要的是拿到了sk_sleep用於KSE(進程/線程)的喚醒 */
			|->sock_poll_wait(file, sk->sk_sleep, wait);
				|->poll_wait
					|->p->qproc(filp, wait_address, p);					/* p爲&epq.pt,並且&epq.pt->qproc= ep_ptable_queue_proc*/
						|-> ep_ptable_queue_proc(filp,wait_address,p);

繞了一大圈以後,咱們的回調函數的安裝其實就是調用了eventpoll.c中的ep_ptable_queue_proc,並且向其中傳遞了sk->sk_sleep做爲其waitqueue的head,其源碼以下所示:

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	// 取出當前client_fd對應的epitem
	struct epitem *epi = ep_item_from_epqueue(pt);
	// &pwq->wait->func=ep_poll_callback,用於回調喚醒
	// 注意,這邊不是init_waitqueue_entry,即沒有將當前KSE(current,當前進程/線程)寫入到
	// wait_queue當中,由於不必定是從當前安裝的KSE喚醒,而應該是喚醒epoll\_wait的KSE
	init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
	// 這邊的whead是sk->sk_sleep,將當前的waitqueue鏈入到socket對應的sleep列表
	add_wait_queue(whead, &pwq->wait);	}	

這樣client_fd的結構進一步完善,以下圖所示:

ep_poll_callback函數是喚醒對應epoll_wait的地方,咱們將在後面一塊兒講述。

epoll_wait

epoll_wait主要是調用了ep_poll:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,		int, maxevents, int, timeout)
{	/* 檢查epfd是不是epoll\_create建立的fd */
	// 調用ep_poll
	error = ep_poll(ep, events, maxevents, timeout);
	...
}

緊接着,咱們看下ep_poll函數:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	......
retry:	// 獲取spinlock
	spin_lock_irqsave(&ep->lock, flags);	// 將當前task_struct寫入到waitqueue中以便喚醒
	// wq_entry->func = default_wake_function;
	init_waitqueue_entry(&wait, current);	// WQ_FLAG_EXCLUSIVE,排他性喚醒,配合SO_REUSEPORT從而解決accept驚羣問題
	wait.flags |= WQ_FLAG_EXCLUSIVE;	// 鏈入到ep的waitqueue中
	__add_wait_queue(&ep->wq, &wait);	for (;;) {		// 設置當前進程狀態爲可打斷
		set_current_state(TASK_INTERRUPTIBLE);		// 檢查當前線程是否有信號要處理,有則返回-EINTR
		if (signal_pending(current)) {
			res = -EINTR;			break;
		}
		spin_unlock_irqrestore(&ep->lock, flags);		// schedule調度,讓出CPU
		jtimeout = schedule_timeout(jtimeout);
		spin_lock_irqsave(&ep->lock, flags);
	}	// 到這裏,代表超時或者有事件觸發等動做致使進程從新調度
	__remove_wait_queue(&ep->wq, &wait);	// 設置進程狀態爲running
	set_current_state(TASK_RUNNING);
	......	// 檢查是否有可用事件
	eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
	......	// 向用戶空間拷貝就緒事件
	ep_send_events(ep, events, maxevents)
}		

上述邏輯以下圖所示:

ep_send_events

ep_send_events函數主要就是調用了ep_scan_ready_list,顧名思義ep_scan_ready_list就是掃描就緒列表:

static int ep_scan_ready_list(struct eventpoll *ep,			      int (*sproc)(struct eventpoll *,					   struct list_head *, void *),			      void *priv,			      int depth){
	...	// 將epfd的rdllist鏈入到txlist
	list_splice_init(&ep->rdllist, &txlist);
	...	/* sproc = ep_send_events_proc */
	error = (*sproc)(ep, &txlist, priv);
	...	// 處理ovflist,即在上面sproc過程當中又到來的事件
	...
}

其主要調用了ep_send_events_proc:

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{	for (eventcnt = 0, uevent = esed->events;
	     !list_empty(head) && eventcnt < esed->maxevents;) {	   // 遍歷ready list 
		epi = list_first_entry(head, struct epitem, rdllink);
		list_del_init(&epi->rdllink);		// readylist只是代表當前epi有事件,具體的事件信息仍是得調用對應file的poll
		// 這邊的poll便是tcp_poll,根據tcp自己的信息設置掩碼(mask)等信息 & 上興趣事件掩碼,則能夠得知當前事件是不是epoll_wait感興趣的事件
		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
			epi->event.events;		if(revents){			/* 將event放入到用戶空間 */
			/* 處理ONESHOT邏輯 */
			// 若是不是邊緣觸發,則將當前的epi從新加回到可用列表中,這樣就能夠下一次繼續觸發poll,若是下一次poll的revents不爲0,那麼用戶空間依舊能感知 */
			else if (!(epi->event.events & EPOLLET)){
				list_add_tail(&epi->rdllink, &ep->rdllist);
			}			/* 若是是邊緣觸發,那麼就不加回可用列表,所以只能等到下一個可用事件觸發的時候纔會將對應的epi放到可用列表裏面*/
			eventcnt++
		}		/* 如poll出來的revents事件epoll_wait不感興趣(或者原本就沒有事件),那麼也不會加回到可用列表 */
		......
	}	return eventcnt;
}			

上述代碼邏輯以下所示:

事件到來添加到epoll就緒隊列(rdllist)的過程

通過上述章節的詳述以後,咱們終於能夠闡述,tcp在數據到來時是怎麼加入到epoll的就緒隊列的了。

可讀事件到來

首先咱們看下tcp數據包從網卡驅動到kernel內部tcp協議處理調用鏈:

step1:

網絡分組到來的內核路徑,網卡發起中斷後調用netif_rx將事件掛入CPU的等待隊列,並喚起軟中斷(soft_irq),再經過linux的軟中斷機制調用net_rx_action,以下圖所示:

注:上圖來自PLKA(<<深刻Linux內核架構>>)

step2:

緊接着跟蹤next_rx_action

next_rx_action
	|-process_backlog
		......
			|->packet_type->func 在這裏咱們考慮ip_rcv
					|->ipprot->handler 在這裏ipprot重載爲tcp_protocol
						(handler 即爲tcp_v4_rcv)					

咱們再看下對應的tcp_v4_rcv

tcp_v4_rcv
      |->tcp_v4_do_rcv
            |->tcp_rcv_state_process
                  |->tcp_data_queue
                        |-> sk->sk_data_ready(sock_def_readable)
                              |->wake_up_interruptible_sync_poll(sk->sleep,...)
                                    |->__wake_up
                                          |->__wake_up_common
                                                |->curr->func                                                /* 這裏已經被ep_insert添加爲ep_poll_callback,並且設定了排它標識WQ_FLAG_EXCLUSIVE*/
                                                      |->ep_poll_callback

這樣,咱們就看下最終喚醒epoll_wait的ep_poll_callback函數:

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{	// 獲取wait對應的epitem	
	struct epitem *epi = ep_item_from_wait(wait);	// epitem對應的eventpoll結構體
	struct eventpoll *ep = epi->ep;	// 獲取自旋鎖,保護ready_list等結構
	spin_lock_irqsave(&ep->lock, flags);	// 若是當前epi沒有被鏈入ep的ready list,則鏈入
	// 這樣,就把當前的可用事件加入到epoll的可用列表了
	if (!ep_is_linked(&epi->rdllink))
		list_add_tail(&epi->rdllink, &ep->rdllist);	// 若是有epoll_wait在等待的話,則喚醒這個epoll_wait進程
	// 對應的&ep->wq是在epoll_wait調用的時候經過init_waitqueue_entry(&wait, current)而生成的
	// 其中的current便是對應調用epoll_wait的進程信息task_struct
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq);
}

上述過程以下圖所示:

最後wake_up_locked調用__wake_up_common,而後調用了在init_waitqueue_entry註冊的default_wake_function,調用路徑爲:

wake_up_locked
	|->__wake_up_common
		|->default_wake_function
			|->try_wake_up (wake up a thread)
				|->activate_task
					|->enqueue_task    running

將epoll_wait進程推入可運行隊列,等待內核從新調度進程,而後epoll_wait對應的這個進程從新運行後,就從schedule恢復,繼續下面的ep_send_events(向用戶空間拷貝事件並返回)。
wake_up過程以下圖所示:

可寫事件到來

可寫事件的運行過程和可讀事件大同小異:
首先,在epoll_ctl_add的時候預先會調用一次對應文件描述符的poll,若是返回事件裏有可寫掩碼的時候直接調用wake_up_locked以喚醒對應的epoll_wait進程。
而後,在tcp在底層驅動有數據到來的時候可能攜帶了ack從而能夠釋放部分已經被對端接收的數據,因而觸發可寫事件,這一部分的調用鏈爲:

tcp_input.c
tcp_v4_rcv
	|-tcp_v4_do_rcv
		|-tcp_rcv_state_process
			|-tcp_data_snd_check
				|->tcp_check_space
					|->tcp_new_space
						|->sk->sk_write_space						/* tcp下便是sk_stream_write_space*/

最後在此函數裏面sk_stream_write_space喚醒對應的epoll_wait進程

void sk_stream_write_space(struct sock *sk)
{	// 即有1/3可寫空間的時候才觸發可寫事件
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {
		clear_bit(SOCK_NOSPACE, &sock->flags);		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
			wake_up_interruptible_poll(sk->sk_sleep, POLLOUT |
						POLLWRNORM | POLLWRBAND)
		......
	}
}

關閉描述符(close fd)

值得注意的是,咱們在close對應的文件描述符的時候,會自動調用eventpoll_release將對應的file從其關聯的epoll_fd中刪除,kernel關鍵路徑以下:

close fd
      |->filp_close
            |->fput
                  |->__fput
                        |->eventpoll_release
                              |->ep_remove

因此咱們在關閉對應的文件描述符後,並不須要經過epoll_ctl_del來刪掉對應epoll中相應的描述符。

總結

epoll做爲linux下很是優秀的事件觸發機制獲得了普遍的運用。其源碼仍是比較複雜的,本文只是闡述了epoll讀寫事件的觸發機制,探究linux kernel源碼的過程很是快樂_

相關文章
相關標籤/搜索