帶您進入內核開發的大門 | 內核中的等待隊列

配套的代碼能夠從本號的github下載: github.com/shuningzhan…linux

等待隊列是一種基於資源狀態的線程管理的機制,它可使線程在資源不知足的狀況下處於休眠狀態,讓出CPU資源,而資源狀態知足時喚醒線程,使其繼續進行業務的處理。 等待隊列(wait queue)用於使線程等待某一特定的事件發生而無需頻繁的輪詢,進程在等待期間睡眠,在某件事發生時由內核自動喚醒。它是以雙循環鏈表爲基礎數據結構,與進程的休眠喚醒機制緊密相聯,是實現異步事件通知、跨進程通訊、同步資源訪問等技術的底層技術支撐。git

一

基本接口

wait_queue_head_t 使用等待隊列時,最基本的數據結構是struct wait_queue_head_t,也就是等待隊列頭,這個能夠理解爲等待隊列的實體。隊列頭中包含一個雙向鏈表,用於記錄在該等待隊列中處於等待狀態的線程等信息。該結構體的定義以下:github

struct __wait_queue_head {
    spinlock_t        lock;  //用於互斥訪問的自旋鎖
    struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
複製代碼

能夠經過宏定義 DECLARE_WAIT_QUEUE_HEAD直接定義一個隊列頭變量,並完成初始化,該宏定義以下:bash

#define DECLARE_WAIT_QUEUE_HEAD(name) \ struct wait_queue_head name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
    
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \ .lock = __SPIN_LOCK_UNLOCKED(name.lock), \ .head = { &(name).head, &(name).head } }
複製代碼

或者是經過結構體wait_queue_head_t定義後,調用函數init_waitqueue_head進行初始化。雖然方式不一樣,但基本原理是同樣的,主要是對結構體內自旋鎖和鏈表的初始化。數據結構

wait_event 函數wait_event用於在某個線程中調用,當調用該函數時,若是參數中的條件不知足,則該線程會進入休眠狀態。下面代碼是該函數的定義:異步

#define wait_event(wq, condition) \ do { \ if (condition) \ break; \ __wait_event(wq, condition); \ } while (0)

#define __wait_event(wq, condition) \ (void)___wait_event(wq, condition, TASK_UNINTERRUPTIBLE, 0, 0, schedule())
複製代碼

wake_up 函數wake_up用於對處於阻塞狀態的線程進行喚醒,其參數就是隊列頭。以下是該函數的定義,咱們這裏暫時不展開介紹。socket

#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL)
複製代碼

瞭解了上面1個數據結構及相關函數後就可使用等待隊列了,固然只是基本的使用。函數

示例程序

咱們這裏給出一個示例程序,程序很簡單。示例程序中有2個線程,分別是服務線程和客戶線程。其中服務線程起來後會檢查條件是否知足,並視狀況進入休眠狀態。而客戶進程會每隔5秒將條件變成可用狀態,並喚醒服務線程。spa

/* 這個例程用於說明等待隊列的用法,在本例程中有2個線程,分別是 * 客戶端和服務端。邏輯很簡單,服務線程起來的時候會等待事件發生 * 並阻塞,客戶端每隔5秒中喚醒一次服務端。*/
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/mm.h>

#include <linux/in.h>
#include <linux/inet.h>
#include <linux/socket.h>
#include <net/sock.h>
#include <linux/kthread.h>
#include <linux/sched.h>
#include <linux/wait.h>

#define BUF_SIZE 1024

struct task_struct *main_task;
struct task_struct *client_task;
wait_queue_head_t wqh;

/* 這個結構體用於在線程之間共享數據 */
struct thread_stat {
        int t_can_run;
};

static inline void sleep(unsigned sec) {
        __set_current_state(TASK_INTERRUPTIBLE);
        schedule_timeout(sec * HZ);
}

static int multhread_server(void *data) {
        int index = 0;
        struct thread_stat* ts = (struct thread_stat*) data;

        while (!kthread_should_stop()) {
                printk(KERN_NOTICE "server run %d\n", index);
                index ++; 
                /*在這裏等待事件, 線程被阻塞在這裏。 */
                wait_event(wqh, ts->t_can_run || kthread_should_stop());
                printk(KERN_NOTICE "server event over!\n");
                ts->t_can_run = 0;
        }

        printk(KERN_NOTICE "server thread end\n");
        return 0;
}
static int multhread_init(void) {
        ssize_t ret = 0;
        struct thread_stat thread_s;
        thread_s.t_can_run = 0;



        printk("Hello, multhread \n");
        /* 初始化等待隊列頭 */
        init_waitqueue_head(&wqh);

        /* 分別啓動2個線程 */
        main_task = kthread_run(multhread_server,
                                &thread_s,
                                "multhread_server");
        if (IS_ERR(main_task)) {
                ret = PTR_ERR(main_task);
                goto failed;
        }

        client_task = kthread_run(multhread_client,
                                  &thread_s,
                                  "multhread_client");
        if (IS_ERR(client_task)) {
                ret = PTR_ERR(client_task);
                goto client_failed;
        }

        return ret;
client_failed:
        kthread_stop(main_task);

failed:
        return ret;
}

static void multhread_exit(void) {
        printk("Bye!\n");
        kthread_stop(main_task);
        kthread_stop(client_task);
}

module_init(multhread_init);
module_exit(multhread_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("SunnyZhang<shuningzhang@126.com>");
複製代碼

等待隊列的原理

關於等待隊列的原理,有3點須要重點說明,理解了這幾點,也就可以比較清晰的理解等待隊列的原理。這3點分別是數據結構、等待函數和喚醒函數。 咱們這裏仍是從結構體提及。這裏主要有2個結構體,前面已經有所介紹。其中wait_queue_head是等待隊列頭,定義以下:線程

struct wait_queue_head {
        spinlock_t              lock;
        struct list_head head;
};
複製代碼

這裏主要是雙向鏈表,全部處於等待狀態的線程都被加入到該雙向鏈表中。等後續喚醒時根據該鏈表中的數據進行喚醒。另一個數據結構是wait_queue_entry,該結構體是一個等待項,這個結構體對於普通用戶一般沒必要關係,由於內核的API對其進行了封裝。

struct wait_queue_entry {
        unsigned int            flags;
        void                    *private;
        wait_queue_func_t       func;
        struct list_head entry;
};
複製代碼

其中前一個結構體的head成員和後一個結構體的entry成員配合,造成所謂的雙向鏈表。咱們先看一下其大概的結構,具體以下圖所示。

1.png

關於等待函數 關於等待函數,前面給出了一部分定義,下面咱們繼續深刻介紹。在介紹以前,咱們先介紹一下其大概流程,本質上就是將當前線程狀態設置爲TASK_UNINTERRUPTIBLE狀態,而後調用schedule函數將本線程調度出去。理解了這個原理,代碼就很容易理解,下面是函數的實現:

#define __wait_event(wq_head, condition) \ (void)___wait_event(wq_head, condition, TASK_UNINTERRUPTIBLE, 0, 0, \ schedule())
複製代碼

直接調用的___wait_event函數,注意觀察一下這個函數的幾個參數,其中TASK_UNINTERRUPTIBLE是目標狀態,而schedule則是在內部要調用的函數。

#define ___wait_event(wq_head, condition, state, exclusive, ret, cmd) \ ({ \ __label__ __out; \ struct wait_queue_entry __wq_entry; \ long __ret = ret; /* explicit shadow */ \ /* 這裏初始化了前文所說的第二個結構體,也就是等待隊列項 */ \ init_wait_entry(&__wq_entry, exclusive ? WQ_FLAG_EXCLUSIVE : 0); \ for (;;) { \ /* 這個函數設置線程狀態,並將等待隊列項添加到等待隊列中 */
                long __int = prepare_to_wait_event(&wq_head, &__wq_entry, state);\
                /* 知足條件的狀況下退出等待 */                        \
                if (condition)                                                  \
                        break;                                                  \
                                                                                \
                if (___wait_is_interruptible(state) && __int) {                 \
                        __ret = __int;                                          \
                        goto __out;                                             \
                }                                                               \
                /* 將線程調度出去 */                                   \
                cmd;                                                            \
        }                                         \
        /*將狀態從新設置爲TASK_RUNNING,並將隊列項移出 */                      \
        finish_wait(&wq_head, &__wq_entry);                                     \
__out:  __ret;                                                                  \
})
複製代碼

這個函數裏面所調用的函數的具體實現就再也不解釋了,代碼貼過來太冗餘了,自己也比較簡單。

關於喚醒函數 喚醒函數前面也作過簡單介紹,咱們這裏直接進入主體,介紹其實現函數。

static void __wake_up_common_lock(struct wait_queue_head *wq_head, 
          unsigned int mode, 
          int nr_exclusive, int wake_flags, void *key)
{
        unsigned long flags;
        ... ...
        spin_lock_irqsave(&wq_head->lock, flags);
        nr_exclusive = __wake_up_common(wq_head, mode, 
                                            nr_exclusive, 
                                            wake_flags, key, &bookmark);
        spin_unlock_irqrestore(&wq_head->lock, flags);

        ...  ...
}
複製代碼

具體實如今函數__wake_up_common中。代碼比較長,咱們這裏刪除沒必要要的代碼,只保留必要的代碼邏輯。

static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
                        int nr_exclusive, int wake_flags, void *key,
                        wait_queue_entry_t *bookmark)
{
        wait_queue_entry_t *curr, *next;
        ... ...
        /* 主要是這個循環,完成全部等待線程的喚醒, 這裏關鍵是調用func */
        list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
                unsigned flags = curr->flags;
                int ret;
                /* 這個函數是在init_wait_entry中初始化的,函數的名字是 * autoremove_wake_function,主要完成線程喚醒的動做。 */
                ret = curr->func(curr, mode, wake_flags, key);
                if (ret < 0)
                        break;
                if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
                        break;
                ... ...
        }
        return nr_exclusive;
}

複製代碼

相信介紹到這裏,你們應該對等待隊列有了比較清晰的認識。總結起來就是要等待的線程加入隊列並休眠,當條件知足時有其它線程將處於休眠狀態的線程喚醒

其它接口

本文只介紹了基本的接口,其實系統還提供了不少擴展功能接口,以wake_up爲例,還包括以下接口:

#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL)
#define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL)
#define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL)
#define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL, 1)
#define wake_up_all_locked(x) __wake_up_locked((x), TASK_NORMAL, 0)

#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
#define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)
#define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
#define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1)
複製代碼

接口比較多,這裏就不一一介紹了,但使用方法是相似的。

相關文章
相關標籤/搜索