內核工做隊列workqueue

    workqueue做爲內核的重要基礎組件,在內核中被普遍的使用,經過工做隊列,能夠很方便的把咱們要執行的某個任務(即函數+上下文參數)交代給內核,由內核替咱們執行。本文主要是介紹工做隊列的使用,及其內部實現的邏輯。
linux

    由於內核在系統初始化的時候,已爲咱們建立好了默認的工做隊列,因此咱們在使用的時候,能夠不須要再建立工做隊列,只須要建立工做,並將工做添加到工做隊列上便可。固然,咱們也能夠建立本身的工做隊列,而不使用內核建立好的工做隊列。
tcp

    簡單的理解,工做隊列是由內核線程+鏈表+等待隊列來實現的,即由一個內核線程不斷的從鏈表中讀取工做,而後執行工做的工做函數!
ide

    1、工做的表示: struct work_struct
函數

typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
    atomic_long_t data; /* 內核內部使用 */
    struct list_head entry; /* 用於連接到工做隊列中*/
    work_func_t func; /* 工做函數*/
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

        從這個結構體能夠看出,一個工做,就是一個待執行的工做函數,那若是咱們要給工做函數傳遞參數,怎麼解決呢?
源碼分析

    仔細觀察工做函數的格式:參數是work_struct,因此在實際使用的時候,常常會在建立咱們本身工做的時候,將此結構體嵌套在內部。而後在work_func函數內部經過container_of來獲得咱們自定義的工做,這樣子就完成參數的傳遞了。ui

2、工做隊列的經常使用接口: 在linux/kernel/workqueue.hthis

  1. 初始化工做:atom

#define __DELAYED_WORK_INITIALIZER(n, f) {          \
    .work = __WORK_INITIALIZER((n).work, (f)),      \
    .timer = TIMER_INITIALIZER(NULL, 0, 0),         \
    }

#define DECLARE_WORK(n, f)                  \
    struct work_struct n = __WORK_INITIALIZER(n, f)
    
// 也可使用INIT_WORK宏:
#define INIT_WORK(_work, _func)                     \
    do {                                \
        (_work)->data = (atomic_long_t) WORK_DATA_INIT();   \
        INIT_LIST_HEAD(&(_work)->entry);            \
        PREPARE_WORK((_work), (_func));             \
    } while (0)

    主要是完成func成員的賦值。
線程

    2. 工做入隊: 添加到內核工做隊列debug

int schedule_work(struct work_struct *work);

    3. 工做撤銷: 從內核工做隊列中刪除

int cancel_work_sync(struct work_struct *work);

    4. 建立工做隊列:

#ifdef CONFIG_LOCKDEP
#define __create_workqueue(name, singlethread, freezeable, rt)  \
({                              \
    static struct lock_class_key __key;         \
    const char *__lock_name;                \
                                \
    if (__builtin_constant_p(name))             \
        __lock_name = (name);               \
    else                            \
        __lock_name = #name;                \
                                \
    __create_workqueue_key((name), (singlethread),      \
                   (freezeable), (rt), &__key,  \
                   __lock_name);            \
})
#else
#define __create_workqueue(name, singlethread, freezeable, rt)  \
    __create_workqueue_key((name), (singlethread), (freezeable), (rt), \
                   NULL, NULL)
#endif

#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)

    建立工做隊列,最終調用的都是__create_workqueue_key()函數來完成,此函數返回的是struct workqueue_struct *,用於表示一個工做隊列。

    5. 銷燬工做隊列:

void destroy_workqueue(struct workqueue_struct *wq);


    在介紹了上面的接口後,看一個簡單的使用例子,這個例子使用的是內核已建立好的工做隊列,要使用本身建立的工做隊列,也是很簡單的,看了後面的實現源碼分析就清楚了。

#include <linux/module.h>
#include <linux/delay.h>
#include <linux/workqueue.h>

#define ENTER() printk(KERN_DEBUG "%s() Enter", __func__)
#define EXIT() printk(KERN_DEBUG "%s() Exit", __func__)
#define ERR(fmt, args...) printk(KERN_ERR "%s()-%d: " fmt "\n", __func__, __LINE__, ##args)
#define DBG(fmt, args...) printk(KERN_DEBUG "%s()-%d: " fmt "\n", __func__, __LINE__, ##args)

struct test_work {
	struct work_struct w;
	unsigned long data;
};

static struct test_work my_work;

static void my_work_func(struct work_struct *work)
{
	struct test_work *p_work;
	ENTER();
	p_work = container_of(work, struct test_work, w);
	while (p_work->data) {
		DBG("data: %lu", p_work->data--);
		msleep_interruptible(1000);
	}

	EXIT();
}

static int __init wq_demo_init(void)
{
	INIT_WORK(&my_work.w, my_work_func);
	my_work.data = 30;

	msleep_interruptible(1000);
	DBG("schedule work begin:");
	if (schedule_work(&my_work.w) == 0) {
		ERR("schedule work fail");
		return -1;
	}

	DBG("success");
	return 0;
}

static void __exit wq_demo_exit(void)
{
	ENTER();
	while (my_work.data) {
		DBG("waiting exit");
		msleep_interruptible(2000);
	}
	EXIT();
}

MODULE_LICENSE("GPL");
module_init(wq_demo_init);
module_exit(wq_demo_exit);

    

    下面就分析workqueue組件的源碼實現,先從work_queue模塊的初始化開始,而後再分析工做的註冊過程,最後是工做如何被執行的。

3、workqueu的初始化:在kernel/workqueue.c

static DEFINE_SPINLOCK(workqueue_lock);  //全局的自旋鎖,用於保證對全局鏈表workqueues的原子操做
static LIST_HEAD(workqueues); // 全局鏈表,用於連接全部的工做隊列
static struct workqueue_struct *keventd_wq;

void __init init_workqueues(void)
{
    alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);

    cpumask_copy(cpu_populated_map, cpu_online_mask);
    singlethread_cpu = cpumask_first(cpu_possible_mask);
    cpu_singlethread_map = cpumask_of(singlethread_cpu);
    hotcpu_notifier(workqueue_cpu_callback, 0);
    keventd_wq = create_workqueue("events");                                                                                              
    BUG_ON(!keventd_wq);
}

    調用create_workqueue()函數建立了一個工做隊列,名字爲events。那就再看看工做隊列是如何建立的:

    上面在介紹建立工做隊列的接口時,有看到最終調用的都是__create_workqueue_key()函數的,在介紹這個函數以前,先看看struct workqueue_struct結構體的定義:在kernel/workqueue.c

struct workqueue_struct {                                                                                                                 
    struct cpu_workqueue_struct *cpu_wq;
    struct list_head list; // 用於連接到全局鏈表workqueues
    const char *name; //工做隊列的名字,即內核線程的名字
    int singlethread;
    int freezeable;     /* Freeze threads during suspend */
    int rt;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

struct cpu_workqueue_struct {

    spinlock_t lock; // 用於保證worklist鏈表的原子操做

    struct list_head worklist;  //用於保存添加的工做
    wait_queue_head_t more_work; // 等待隊列,當worklist爲空時,則會將內核線程掛起,存放與此鏈表中
    struct work_struct *current_work;  //保存內核線程當前正在執行的工做

    struct workqueue_struct *wq;
    struct task_struct *thread; // 內核線程
} ____cacheline_aligned;

    下面看一下建立函數的內部實現,這裏要注意咱們傳遞的參數:singlethread = 0, freezeable =0, rt = 0

struct workqueue_struct *__create_workqueue_key(const char *name,
                        int singlethread,
                        int freezeable,
                        int rt,
                        struct lock_class_key *key,
                        const char *lock_name)
{
    struct workqueue_struct *wq;
    struct cpu_workqueue_struct *cwq;
    int err = 0, cpu;

    wq = kzalloc(sizeof(*wq), GFP_KERNEL);
    if (!wq)
        return NULL;

    wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
    if (!wq->cpu_wq) {
        kfree(wq);
        return NULL;
    }

    wq->name = name;
    lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
    wq->singlethread = singlethread;
    wq->freezeable = freezeable;
    wq->rt = rt;
    INIT_LIST_HEAD(&wq->list);

    if (singlethread) {
        cwq = init_cpu_workqueue(wq, singlethread_cpu);
        err = create_workqueue_thread(cwq, singlethread_cpu); 
        start_workqueue_thread(cwq, -1);
    } else {
        cpu_maps_update_begin();
        /*
         * We must place this wq on list even if the code below fails.
         * cpu_down(cpu) can remove cpu from cpu_populated_map before
         * destroy_workqueue() takes the lock, in that case we leak
         * cwq[cpu]->thread.
         */
        spin_lock(&workqueue_lock);
        list_add(&wq->list, &workqueues);
        spin_unlock(&workqueue_lock);
        /*
         * We must initialize cwqs for each possible cpu even if we
         * are going to call destroy_workqueue() finally. Otherwise
         * cpu_up() can hit the uninitialized cwq once we drop the
         * lock.
         */
        for_each_possible_cpu(cpu) {
            cwq = init_cpu_workqueue(wq, cpu);
            if (err || !cpu_online(cpu))
                continue;
            err = create_workqueue_thread(cwq, cpu); /*建立內核線程*/
            start_workqueue_thread(cwq, cpu); /*啓動內核線程*/
        }
        cpu_maps_update_done();
    }

    if (err) {
        destroy_workqueue(wq);
        wq = NULL;
    }
    return wq;
}

        主要的代碼邏輯是建立一個struct workqueue_struct類型的對象,而後將此工做隊列加入到workqueues鏈表中,最後是調用create_workqueue_thread()建立一個內核線程,並啓動此線程。

    咱們知道內核線程最主要的是它的線程函數,那麼工做隊列的線程函數時什麼呢?

static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
    struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
    struct workqueue_struct *wq = cwq->wq;
    const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
    struct task_struct *p;
    
    p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
    /*
     * Nobody can add the work_struct to this cwq,
     *  if (caller is __create_workqueue)
     *      nobody should see this wq
     *  else // caller is CPU_UP_PREPARE
     *      cpu is not on cpu_online_map
     * so we can abort safely.
     */
    if (IS_ERR(p))
        return PTR_ERR(p);
    if (cwq->wq->rt)
        sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
    cwq->thread = p;
    
    trace_workqueue_creation(cwq->thread, cpu);
    
    return 0;
}

static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
    struct task_struct *p = cwq->thread;

    if (p != NULL) {
        if (cpu >= 0)
            kthread_bind(p, cpu);
        wake_up_process(p);
    }
}

        主要就是調用kthread_create()建立內核線程,線程函數爲worker_thread,參數爲cwq。start_workqueue_thread()函數就是調用wake_up_process()把內核線程加入到run queue中。

    下面就分析下線程函數worker_thread究竟是怎麼咱們添加的工做的?

static int worker_thread(void *__cwq)
{
    struct cpu_workqueue_struct *cwq = __cwq;
    DEFINE_WAIT(wait);

    if (cwq->wq->freezeable)
        set_freezable();

    for (;;) {
        prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
        if (!freezing(current) &&
            !kthread_should_stop() &&
            list_empty(&cwq->worklist))
            schedule(); //若worklist鏈表爲空,則進行調度
        finish_wait(&cwq->more_work, &wait);

        try_to_freeze();

        if (kthread_should_stop())
            break;

        run_workqueue(cwq);//執行隊列中的工做
    }

    return 0;
}

        前面已經有文章分析了內核線程和等待隊列waitqueue,瞭解這個的話,就很容易看懂這段代碼,就是判斷worklist隊列是否爲空,若是爲空,則將當前內核線程掛起,不然就調用run_workqueue()去執行已添加註冊的工做:

static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
    spin_lock_irq(&cwq->lock);
    while (!list_empty(&cwq->worklist)) {
        struct work_struct *work = list_entry(cwq->worklist.next,
                        struct work_struct, entry);
        work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
        /*
         * It is permissible to free the struct work_struct
         * from inside the function that is called from it,
         * this we need to take into account for lockdep too.
         * To avoid bogus "held lock freed" warnings as well
         * as problems when looking into work->lockdep_map,
         * make a copy and use that here.
         */
        struct lockdep_map lockdep_map = work->lockdep_map;
#endif
        trace_workqueue_execution(cwq->thread, work);
        cwq->current_work = work; //保存當前工做到current_work
        list_del_init(cwq->worklist.next); // 將此工做從鏈表中移除
        spin_unlock_irq(&cwq->lock);

        BUG_ON(get_wq_data(work) != cwq);
        work_clear_pending(work);
        lock_map_acquire(&cwq->wq->lockdep_map);
        lock_map_acquire(&lockdep_map);
        f(work); //執行工做函數
        lock_map_release(&lockdep_map);
        lock_map_release(&cwq->wq->lockdep_map);

        if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
            printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
                    "%s/0x%08x/%d\n",
                    current->comm, preempt_count(),
                        task_pid_nr(current));
            printk(KERN_ERR "    last function: ");
            print_symbol("%s\n", (unsigned long)f);
            debug_show_held_locks(current);
            dump_stack();
        }

        spin_lock_irq(&cwq->lock);
        cwq->current_work = NULL;
    }
    spin_unlock_irq(&cwq->lock);
}

    上面的註釋已經說明清楚代碼的邏輯了,這裏就不在解釋了。

4、添加工做到內核工做隊列中:

    上面提到了,當工做隊列中的worklist鏈表爲空,及沒有須要執行的工做,怎會將工做隊列所在的內核線程掛起,那麼何時會將其喚醒呢?確定就是當有工做添加到鏈表的時候,即調用schedule_work()的時候:

int schedule_work(struct work_struct *work)
{
    return queue_work(keventd_wq, work); // 將工做添加到內核提咱們建立好的工做隊列中
}

    前面在初始化的時候,就將內核建立的工做隊列保存在keventd_wq變量中。

/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns 0 if @work was already on a queue, non-zero otherwise.
 *
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
 */
int queue_work(struct workqueue_struct *wq, struct work_struct *work)                                                                     
{
    int ret;

    ret = queue_work_on(get_cpu(), wq, work);
    put_cpu();

    return ret;
}

int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
    int ret = 0;

    if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
        BUG_ON(!list_empty(&work->entry));
        __queue_work(wq_per_cpu(wq, cpu), work);
        ret = 1;
    }
    return ret;
}

static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
{
    unsigned long flags;

    spin_lock_irqsave(&cwq->lock, flags); //加鎖,保證insert_work原子操做
    insert_work(cwq, work, &cwq->worklist);
    spin_unlock_irqrestore(&cwq->lock, flags); //解鎖
}

static void insert_work(struct cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head)
{
    trace_workqueue_insertion(cwq->thread, work);
    
    set_wq_data(work, cwq);
    /*
     * Ensure that we get the right work->data if we see the
     * result of list_add() below, see try_to_grab_pending().
     */
    smp_wmb();
    list_add_tail(&work->entry, head); //加入到worklist鏈表
    wake_up(&cwq->more_work); //喚醒在more_work等待鏈表上的任務,即工做隊列線程
}

5、使用自定義工做隊列:

    經過上面的分析,建立 工做隊列最基本的接口時create_workqueue()。當咱們要把工做放入到自定義的工做隊列時,使用以下接口:

int queue_work(struct workqueue_struct *wq, struct work_struct *work);

    在上面的分析中,其實已經使用了此接口,只不過咱們調用schedule_work()的時候,wq參數爲內核已建立好的工做隊列keventd_wq。

相關文章
相關標籤/搜索