linux 內核的等待隊列和進程調度息息相關,進程在某些狀況下必須等待某些事件的發生,例如:等待一個磁盤操做的終止,等待釋放系統資源,或等待指定的時間間隔。linux
等待隊列實現了在事件上的條件等待:但願等待特定事件的進程把本身放進合適的等待隊列,並放棄控制權。程序員
所以,等待隊列表示一組睡眠的進程,當某一條件知足時,由內核喚醒它們。express
基於上述對等待隊列的基本描述,很直觀地會產生如下疑問,咱們帶着問題來分析:數據結構
注:本文基於 linux-4.9 的版本進行分析。函數
顧名思義,等待隊列是一個特殊的隊列,代碼中使用了兩個數據結構來描述一個等待隊列:wait_queue_head_t 和 wait_queue_t。字體
這兩個數據結構定義在 include/linux/wait.h 頭文件中。ui
struct __wait_queue_head { spinlock_t lock; struct list_head task_list; }; typedef struct __wait_queue_head wait_queue_head_t; struct __wait_queue { unsigned int flags; void *private; wait_queue_func_t func; struct list_head task_list; }; typedef struct __wait_queue wait_queue_t;
等待隊列是一個雙向隊列,wait_queue_head_t 表明該隊列的頭部,wait_queue_t 表明隊列中有效的成員,其 private 指針指向了關聯進程的 task_struct 結構體。this
一個等待隊列只有一個 wait_queue_head_t,由於等待隊列多是空的,不包含 wait_queue_t 成員,因此使用一個單獨的頭部來保持該隊列。atom
wait_queue_head_t 的結構很簡單,只有一個 spinlock 和 一個 list_head 成員來構成隊列,其做用只是維持等待隊列的頭部。lua
wait_queue_t 是等待隊列的有效成員,除去 list_head 外,它包含 3 個屬性:
至此,咱們明確了等待隊列的基本數據結構,看起來很是簡單明瞭。
接下來的疑問是等待隊列如何與進程關聯起來,或者說進程如何使用等待隊列?
首先須要分配一個 wait_queue_head_t 結構,並將其初始化,完成這個操做有兩種方法:靜態建立和動態建立
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \ .lock = __SPIN_LOCK_UNLOCKED(name.lock), \ .task_list = { &(name).task_list, &(name).task_list } } #define DECLARE_WAIT_QUEUE_HEAD(name) \ wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
經過引用 DECLARE_WAIT_QUEUE_HEAD(name) 建立一個名爲 name 的 wait_queue_head_t,其存儲空間分配在數據段
另一種建立方式是使用 wait_queue_head_t 初始化函數 init_waitqueue_head,該函數定義在 include/linux/wait.h 頭文件中。
#define init_waitqueue_head(q) \ do { \ static struct lock_class_key __key; \ \ __init_waitqueue_head((q), #q, &__key); \ } while (0) void __init_waitqueue_head(wait_queue_head_t *q, const char *name, struct lock_class_key *key) { spin_lock_init(&q->lock); lockdep_set_class_and_name(&q->lock, key, name); INIT_LIST_HEAD(&q->task_list); }
init_waitqueue_head 函數只是初始化 wait_queue_head_t 的數據成員,其存儲空間事先已分配,可由程序員靈活處理:
能夠靜態分配在 data 段,也能夠動態地在堆上分配空間。
到這裏只是建立了一個空隊列,這個隊列尚未實際的做用。
進程使用等待隊列,須要關聯一個 wait_queue_t 數據結構
#define __WAITQUEUE_INITIALIZER(name, tsk) { \
.private = tsk, \
.func = default_wake_function, \
.task_list = { NULL, NULL } }
#define DECLARE_WAITQUEUE(name, tsk) \
wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
可使用 DECLARE_WAITQUEUE(name, tsk) 宏來建立一個等待隊列成員,這個宏展開後的結果爲:
即聲明一個名字爲 name 的 wait_queue_t 結構,注意該 wait_queue_t 的生命週期和該宏引用的位置有關,若是在函數內使用,那麼 wait_queue_t 的生命週期限定在該函數內。
添加等待隊列成員:
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new) {
list_add (&new->task_list, &head->task_list); } void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; wait->flags &= ~WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(add_wait_queue);
static inline void __add_wait_queue_tail(wait_queue_head_t *head, wait_queue_t *new) { list_add_tail(&new->task_list, &head->task_list); } void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; wait->flags |= WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue_tail(q, wait); spin_unlock_irqrestore(&q->lock, flags); }
刪除等待隊列成員:
static inline void __remove_wait_queue(wait_queue_head_t *head, wait_queue_t *old) {
list_del(&old->task_list); } void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __remove_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(remove_wait_queue);
添加/刪除等待隊列成員的操做只是簡單的鏈表操做,將表明進程的 wait_queue_t 結構插入隊列或從隊列中刪除。
注意:互斥的(exclusive)等待進程是插入到等待隊列的尾部。
進程是什麼時候進入休眠狀態?又是如何從等待隊列被喚醒的呢?
接下來咱們看一下等待隊列的 wakeup 函數是如何實現的。
從等待隊列的建立宏 DECLARE_WAITQUEUE 中能夠看到,wait_queue_t 中有一個指向 task_struct 的 private 指針能夠將 wait_queue_t 和一個進程 tast_struct 關聯起來。
同時還將 wait_queue_func_t 函數成員綁定到 default_wake_function 函數。
include/linux/wait.h 和 kernel/sched/wait.c 中提供了 wake_up 函數,該函數能夠喚醒等待隊列中的進程。
經過代碼來看一下,這個wake_up 函數具體作了什麼工做,應該如何調用 wake_up 函數。
wait.h 提供了一系列 __wake_up 函數的封裝形式,其具體實現都基於 wait.c 中的 __wake_up() 函數:
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) #define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL) #define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL, 1) #define wake_up_all_locked(x) __wake_up_locked((x), TASK_NORMAL, 0) #define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) #define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1)
從這一系列接口形式能夠看出,其核心都是 __wake_up 函數,這些封裝應用於不一樣場景,針對不一樣類型的進程。
/* * The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just * wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve * number) then we wake all the non-exclusive tasks and one exclusive task. * * There are circumstances in which we can try to wake a task which has already * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns * zero in this (rare) case, and we handle it by continuing to scan the queue. */ static void__wake_up_common
(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next;list_for_each_entry_safe
(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
/* 注意這裏的三個判斷條件,其直接決定了 wakeup 函數的操做結果 */
if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; } } /** * __wake_up - wake up threads blocked on a waitqueue. * @q: the waitqueue * @mode: which threads * @nr_exclusive: how many wake-one or wake-many threads to wake up * @key: is directly passed to the wakeup function * * It may be assumed that this function implies a write memory barrier before * changing the task state if and only if any tasks are woken up. */ void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr_exclusive, 0, key); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(__wake_up);
從 __wake_up 的代碼能夠看出,其核心操做就是在 __wake_up_common 中遍歷等待隊列,而後調用其成員的 func 函數。
咱們再回頭看一下 func 函數,在使用DECLARE_WAITQUEUE(name, tsk) 宏來建立等待隊列成員的時候,func 函數綁定爲 default_wake_function。
注意:若是不使用 DECLARE_WAITQUEUE(name, tsk) 宏建立等待隊列成員,那麼能夠自定義 wait_queue_t 的 func 函數。
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key) { return try_to_wake_up(curr->private, mode, wake_flags); }
EXPORT_SYMBOL(default_wake_function);
default_wake_function 和其調用的 try_to_wake_up 函數都定義在 kernel/sched/core.c,核心函數是 try_to_wake_up,本文不深究函數細節,只該函數的原型和註釋
/** * try_to_wake_up - wake up a thread * @p: the thread to be awakened * @state: the mask of task states that can be woken * @wake_flags: wake modifier flags (WF_*) * * Put it on the run-queue if it's not already there. The "current" * thread is always on the run-queue (except when the actual * re-schedule is in progress), and as such you're allowed to do * the simpler "current->state = TASK_RUNNING" to mark yourself * runnable without the overhead of this. * * Return: %true if @p was woken up, %false if it was already running. * or @state didn't match @p's state. */ static int try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags);
該函數的功能就是把調用參數傳入的進程描述符所表明的進程狀態設置爲 TASK_RUNNING 並放到 run-queue 中,後續由調度程序來調度運行。
這裏須要重點關注 __wake_up_common 中遍歷等待隊列的三個 break 條件:
if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break;
注意 C 語言多個判斷條件的執行過程,此例中當前一個條件爲 false 時會直接 break,不會繼續執行後續條件表達式;
等待隊列中,EXCLUSIVE 類型的進程插入在隊列的尾部,所以 __wake_up_common 函數的語義有如下幾個要點:
__wake_up 函數有 4 個參數:
1. wait_queue_head_t *q:這個參數很直觀,即等待隊列的頭部,經過它能夠遍歷到隊列中的全部節點
2. unsigned int mode:該參數的註釋是 「which threads」,是一個 unsigned int 類型,他表明什麼意思呢?
咱們看一下引用 __wake_up 時傳入的參數和 __wake_up 對該參數的使用方式
wait.h 中的 wake_up 系列函數傳入的 mode 參數爲 TASK_NORMAL 和 TASK_INTERRUPTIBLE,TASK_NORMAL 的定義以下:
#define TASK_NORMAL (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE)
這是表明進程狀態的 flag 定義,它的傳遞路徑:
__wake_up --> __wake_up_common –> default_wake_function –> try_to_wake_up
最終起做用在 try_to_wake_up 的第二個參數:
@state: the mask of task states that can be woken
總結一下,__wake_up 的第二個參數,表示本次調用將喚醒處於 TASK_NORMAL 狀態的進程仍是隻喚醒 TASK_INTERRUPTIBLE 的進程。
3. int nr_exclusive:該參數註釋「how many wake-one or wake-many threads to wake up」,是一個 int 類型
該參數表示這次 __wake_up 調用將喚醒多少個互斥的等待進程,它的傳遞路徑:
__wake_up --> __wake_up_common
4. void *key:該參數將傳遞給 func 的第 4 個參數,default_wake_function 並無使用該參數,暫不深刻分析。若是使用用戶自定義的 func 函數的話,key 參數將有其餘做用。
從上述分析過程當中,能夠得出一個基本的思路:
等待隊列是一個維護了一系列進程的雙向隊列,等待隊列中的進程分爲互斥(帶 WQ_FLAG_EXCLUSIVE 標識)和非互斥(不帶 WQ_FLAG_EXCLUSIVE 標識)的,
kernel 中提供了一系列函數將進程插入等待隊列或從等待隊列中刪除,同時提供了 wakeup 函數來喚醒等待隊列中的進程。
那麼所謂「等待隊列」的「等待」二字體如今哪裏?應當如何使用等待隊列呢?
以 kernel mmc driver 中的 mmc_claim_host 和 mmc_release_host 爲例來看一下等待隊列的具體使用。
kernel mmc driver 中對 host 的某些操做必須是互斥的,由於 host 硬件的某些操做過程必須保持必定的完整性,不能被多個進程並行訪問。
所以在執行這類操做前,driver 調用 mmc_claim_host 聲明佔用 host,操做完成後使用 mmc_release_host 釋放 host 資源。
咱們直接在下面的代碼中添加註釋來講明等待隊列在其中發揮的做用。
/** * __mmc_claim_host - exclusively claim a host * @host: mmc host to claim * @abort: whether or not the operation should be aborted * * Claim a host for a set of operations. If @abort is non null and * dereference a non-zero value then this will return prematurely with * that non-zero value without acquiring the lock. Returns zero * with the lock held otherwise. */ int __mmc_claim_host(struct mmc_host *host, atomic_t *abort) { /* * 聲明一個名爲 wait 的 wait_queue_t 結構體,綁定到 current 進程 * 注意 wait 的生命週期位於該函數內,其存儲空間分配在該函數棧上 */ DECLARE_WAITQUEUE(wait, current); unsigned long flags; int stop; bool pm = false; might_sleep(); /* * 將 wait 加入到 host->wq 這個等待隊列中 * host->wq 是 host 的一個成員變量,driver 加載時已經初始化 */ add_wait_queue(&host->wq, &wait); spin_lock_irqsave(&host->lock, flags); while (1) { /* 設置當前進程的狀態,再也不處於 RUNNING 狀態,不會被再次調度執行 */ set_current_state(TASK_UNINTERRUPTIBLE); stop = abort ? atomic_read(abort) : 0; /* 這裏體現了等待條件,當如下任一條件知足時,跳出 while(1) 循環*/ if (stop || !host->claimed || host->claimer == current) break; spin_unlock_irqrestore(&host->lock, flags); /* 若是上述等待條件不知足,讓出 CPU 資源,進入等待狀態 */ schedule(); /* * 當 host->wq 被 wakeup 函數喚醒時,該進程可能被再次被調度執行 * 將再次從 while(1) 進入檢查上述等待條件,看是否可以得到 host 使用權 */ spin_lock_irqsave(&host->lock, flags); } /* 運行到此處,說明 while(1) 的 break 條件知足,將進程狀態設置爲 TASK_RUNNING */ set_current_state(TASK_RUNNING); if (!stop) { host->claimed = 1; host->claimer = current; host->claim_cnt += 1; if (host->claim_cnt == 1) pm = true; } else wake_up(&host->wq); spin_unlock_irqrestore(&host->lock, flags); /* 將 wait 從 host->wq 中移除 */ remove_wait_queue(&host->wq, &wait); if (pm) pm_runtime_get_sync(mmc_dev(host)); return stop; } /* 對 __mmc_claim_host 的簡單封裝,無需特別關注 */ static inline void mmc_claim_host(struct mmc_host *host) { __mmc_claim_host(host, NULL); } /** * mmc_release_host - release a host * @host: mmc host to release * * Release a MMC host, allowing others to claim the host * for their operations. */ void mmc_release_host(struct mmc_host *host) { /* 當 driver 完成 host 的互斥操做後,調用該函數釋放 host 資源 */ unsigned long flags; WARN_ON(!host->claimed); spin_lock_irqsave(&host->lock, flags); if (--host->claim_cnt) { /* Release for nested claim */ spin_unlock_irqrestore(&host->lock, flags); } else { host->claimed = 0; host->claimer = NULL; spin_unlock_irqrestore(&host->lock, flags); /* 調用 wakeup 喚醒 host->wq 等待隊列中的其餘等待進程運行 */ wake_up(&host->wq); pm_runtime_mark_last_busy(mmc_dev(host)); pm_runtime_put_autosuspend(mmc_dev(host)); } }
include/linux/wait.h 中提供了一系列使用等待隊列的便捷方法,例如:
這些方法都是宏定義,其功能相似可是有不一樣的語義,適用不一樣的使用場景。
咱們以 wait_event 爲例來看一下其具體實現,其代碼以下(注意註釋中高亮部分對其語義的描述):
/** * wait_event - sleep until a condition gets true * @wq: the waitqueue to wait on * @condition: a C expression for the event to wait for * * The process is put to sleep (TASK_UNINTERRUPTIBLE) until the * @condition evaluates to true. The @condition is checked each time * the waitqueue @wq is woken up. * * wake_up() has to be called after changing any variable that could * change the result of the wait condition. */ #define wait_event(wq, condition) \ do { \ might_sleep(); \ if (condition) \ break; \ __wait_event(wq, condition); \ } while (0) /* * The below macro ___wait_event() has an explicit shadow of the __ret * variable when used from the wait_event_*() macros. * * This is so that both can use the ___wait_cond_timeout() construct * to wrap the condition. * * The type inconsistency of the wait_event_*() __ret variable is also * on purpose; we use long where we can return timeout values and int * otherwise. */ #define ___wait_event(wq, condition, state, exclusive, ret, cmd) \ ({ \ __label__ __out; \ wait_queue_t __wait; \ long __ret = ret; /* explicit shadow */ \ \ init_wait_entry(&__wait, exclusive ? WQ_FLAG_EXCLUSIVE : 0); \ for (;;) { \ long __int = prepare_to_wait_event(&wq, &__wait, state);\ \ if (condition) \ break; \ \ if (___wait_is_interruptible(state) && __int) { \ __ret = __int; \ goto __out; \ } \ \ cmd; \ } \ finish_wait(&wq, &__wait); \ __out: __ret; \ }) #define __wait_event(wq, condition) \ (void)___wait_event(wq, condition, TASK_UNINTERRUPTIBLE, 0, 0, \ schedule())
wait_event(wq, condition) 的上述實現就是一系列的宏定義。
將 wait_event(wq, condition) 宏展開就獲得下面一個代碼段,這個代碼段沒有返回值,所以 wait_event 不能做爲右值使用。
咱們在該代碼段中加入註釋來講明其工做原理:
do { might_sleep();
/* 若是 condition 條件爲 true,不會進入等待狀態 */ if (condition) break; (void)({ __label__ __out;
/* 建立等待隊列成員 */ wait_queue_t __wait; long __ret = 0; /* explicit shadow */
/* 初始化 __wait, 注意 init_wait_entry 初始化 __wait 時綁定的 func */ init_wait_entry(&__wait, 0); for (;;) {
/*
* 將 __wait 加入到等待隊列中,返回 0 表示 __wait 加入到等待隊列,非 0 表示未加入
* 因爲 wait_event 展開時傳入的 state 參數爲 TASK_UNINTERRUPTIBLE,
* 因此此處 __int 得到的返回值必定爲 0
*/ long __int = prepare_to_wait_event(&wq, &__wait, TASK_UNINTERRUPTIBLE); if (condition) break;
/* 這個 if 判斷條件的結果必定爲 false */ if (___wait_is_interruptible(TASK_UNINTERRUPTIBLE) && __int) { __ret = __int; goto __out; }
/* 讓出 CPU 資源,進入等待狀態 */ schedule(); }
/* 將 current 進程設置爲 TASK_RUNNING 狀態,並將 __wait 從等待隊列 wq 中移除 */ finish_wait(&wq, &__wait); __out:
__ret; }) } while (0)
上述宏展開的代碼段中涉及的幾個關鍵函數代碼以下:
void init_wait_entry(wait_queue_t *wait, int flags) { wait->flags = flags; wait->private = current; wait->func = autoremove_wake_function; INIT_LIST_HEAD(&wait->task_list); } int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key) { int ret = default_wake_function(wait, mode, sync, key); if (ret) list_del_init(&wait->task_list); return ret; } long prepare_to_wait_event(wait_queue_head_t *q, wait_queue_t *wait, int state) { unsigned long flags; long ret = 0; spin_lock_irqsave(&q->lock, flags); if (unlikely(signal_pending_state(state, current))) { /* * Exclusive waiter must not fail if it was selected by wakeup, * it should "consume" the condition we were waiting for. * * The caller will recheck the condition and return success if * we were already woken up, we can not miss the event because * wakeup locks/unlocks the same q->lock. * * But we need to ensure that set-condition + wakeup after that * can't see us, it should wake up another exclusive waiter if * we fail. */ list_del_init(&wait->task_list); ret = -ERESTARTSYS; } else { if (list_empty(&wait->task_list)) { if (wait->flags & WQ_FLAG_EXCLUSIVE) __add_wait_queue_tail(q, wait); else __add_wait_queue(q, wait); } set_current_state(state); } spin_unlock_irqrestore(&q->lock, flags); return ret; } EXPORT_SYMBOL(prepare_to_wait_event);
wait_event(wq, condition) 實際的操做流程和 4.1 章節中描述的 __mmc_claim_host 是相似的,wait_event 將這個過程封裝起來提供了更便捷的使用方法
一個進程要使用 wait_event 等待一個特定事件,須要如下三個基本步驟:
使用 wait_event 系列宏操做等待隊列,比 __mmc_claim_host 中的方式要簡單直觀,也更不容易出錯。
要正確使用 wait_event 系列宏,關鍵是要理解每個宏的語義以及適用場景,能夠經過閱讀源代碼來深刻理解。
等待隊列是 linux kernel 中與進程調度相關的重要機制,爲進程間的同步提供了一種便捷的方式。
正確使用等待隊列的前提是明白它的基本實現原理,掌握 wait_event 系列宏的語義和適用場景,在閱讀源代碼的基礎上深刻理解。