Linux-workqueue講解

=============  參考  =============node

代碼:linux-3.10.65/kernel/workqueue.clinux

===============================多線程

1. workqueue 是什麼?

  workqueue是對內核線程封裝的用於處理各類工做項的一種處理方法, 因爲處理對象是用鏈表拼接一個個工做項, 依次取出來處理, 而後從鏈表刪除,就像一個隊列排好隊依次處理同樣, 因此也稱工做隊列,架構

所謂封裝能夠簡單理解一箇中轉站, 一邊指向「合適」的內核線程, 一邊接受你丟過來的工做項, 用結構體 workqueue_srtuct表示, 而所謂工做項也是個結構體 --  work_struct, 裏面有個成員指針, 指向你最終要實現的函數,app

struct workqueue_struct {
    struct list_head    pwqs;        /* WR: all pwqs of this wq */
    struct list_head    list;        /* PL: list of all workqueues */

    struct workqueue_attrs    *unbound_attrs;    /* WQ: only for unbound wqs */
    struct pool_workqueue    *dfl_pwq;    /* WQ: only for unbound wqs */

    char            name[WQ_NAME_LEN]; /* I: workqueue name */

    unsigned int        flags ____cacheline_aligned; /* WQ: WQ_* flags */
    struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
    struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */
};


struct work_struct {
    atomic_long_t data;    //函數的參數
    struct list_head entry;    //掛到鏈表
    work_func_t func; //函數指針,指向你實現的函數功能
};

 

 固然使用者在實現本身函數功能後能夠直接調用,或者經過kthread_create()把函數當作新線程的主代碼, 或者add_timer添加到一個定時器延時處理, tcp

那爲什麼要弄個work_struct工做項先封裝函數, 而後再丟到workqueue_srtuct處理呢? 這就看使用場景了, 若是是一個大函數, 處理事項比較多, 且須要重複處理, 能夠單獨開闢一個內核線程處理; 對延時敏感的能夠用定時器; ide

若是隻是簡單的一個函數功能,  且函數裏面有延時動做的, 就適合放到工做隊列來處理了, 畢竟定時器處理的函數是在中斷上下文,不能delay或者引起進程切換的API,  並且開闢一個內核線程是耗時且耗費資源的, 通常用於函數須要while(1) 不斷循環處理的,函數

否則處理一次函數後退出,線程又被銷燬, 簡直就是浪費!ui

 

2. 怎麼用?

  一個簡單示例:this

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/delay.h>
#include <linux/workqueue.h>

struct workqueue_struct *workqueue_test;

struct work_struct work_test;

void work_test_func(struct work_struct *work)
{
    printk("%s()\n", __func__);

    //mdelay(1000);
    //queue_work(workqueue_test, &work_test);
}


static int test_init(void)
{
    printk("Hello,world!\n");

    /* 1. 本身建立一個workqueue, 中間參數爲0,默認配置 */
    workqueue_test = alloc_workqueue("workqueue_test", 0, 0);

    /* 2. 初始化一個工做項,並添加本身實現的函數 */
    INIT_WORK(&work_test, work_test_func);

    /* 3. 將本身的工做項添加到指定的工做隊列去, 同時喚醒相應線程處理 */
    queue_work(workqueue_test, &work_test);
    
    return 0;
}

static void test_exit(void)
{
    printk("Goodbye,cruel world!\n");
    destroy_workqueue(workqueue_test);
}

module_init(test_init);
module_exit(test_exit);

MODULE_AUTHOR("Vedic <FZKmxcz@163.com>");
MODULE_LICENSE("Dual BSD/GPL");
obj-m +=test.o

KDIR:=/home/fuzk/project/linux-3.10.65

COMPILER=/opt/toolchain/arm-2012.03/bin/arm-none-linux-gnueabi-
ARCH_TYPE=arm

all:
    make CROSS_COMPILE=$(COMPILER) ARCH=$(ARCH_TYPE) -C $(KDIR)  M=$(PWD)  modules

clean:
    make CROSS_COMPILE=$(COMPILER) ARCH=$(ARCH_TYPE) -C $(KDIR)  M=$(PWD)  clean
Makefile

  只需三步就能夠了, 固然內核已經爲咱們建立了幾個工做隊列, 咱們能夠直接將本身的工做項掛到相應的隊列便可:

  因此代碼能夠改成:

static int test_init(void)
{
    printk("Hello,world!\n");

    /* 2. 初始化一個工做項,並添加本身實現的函數 */
    INIT_WORK(&work_test, work_test_func);

    /* 3. 將本身的工做項添加到指定的工做隊列去, 同時喚醒相應線程處理 */
    queue_work(system_wq, &work_test);
    
    return 0;
}

若是workqueue對象是
system_wq, 可使用另外一個封裝函數schedule_work(&work_test)

static inline bool schedule_work(struct work_struct *work)
{
  return queue_work(system_wq, work);
}

   將本身的工做項掛到已有的工做隊列須要注意的是因爲這些隊列是共享的, 各個驅動都有可能將本身的工做項放到同個隊列, 會致使隊列的項擁擠, 當有些項寫的代碼耗時久或者調用delay()延時特別久, 你的項將會遲遲得不到執行! 

因此早期不少驅動開發人員都是本身建立workqueue, 添加本身的work。 在Linux-2.XXX時代, 建立workqueue時會建立屬於workqueue本身的內核線程, 這些線程是「私有的」, 雖然是方便了驅動開發人員, 但每一個驅動都「一言不合」就

建立workqueue致使太多線程, 嚴重佔用系統資源和效率, 因此在Linux-3.XXX時代, 社區開發人員將workqueue和內核線程剝離! 內核會本身事先建立相應數量的線程(後面詳解), 被全部驅動共享使用。  用戶調用alloc_workqueue()

只是建立workqueue這個空殼, 其主要做用:

  a. 兼容Linux-2.XXX時代代碼

  b. 新增flag字段代表這個workqueue的屬性(普通優先級仍是高優先級等), 方便在queue_work()時尋找「合適的」線程, 由於事先建立的線程分普通優先級、高優先級、綁定CPU線程, 非綁定CPU線程等

固然這對驅動開發人員是透明的, 驅動人員只需關注調用queue_work()讓線程執行本身的工做項, 至因而這個workqueue的私有線程仍是如今的共享線程, 不重要! 這樣就限制了系統工做線程的暴漲, 惟一的缺點就是前面提到的, 跟別人共享會增長

本身的工做項被執行的不肯定性。 只能說各個驅動開發人員自我約束, 儘可能使得工做項函數簡短快速, 若是咱們須要等本身的工做項被執行完才能處理其餘事情, 能夠調用flush_work() 等待work被執行完:

/**
 * flush_work - wait for a work to finish executing the last queueing instance
 * @work: the work to flush
 *
 * Wait until @work has finished execution.  @work is guaranteed to be idle
 * on return if it hasn't been requeued since flush started.
 *
 * RETURNS:
 * %true if flush_work() waited for the work to finish execution,
 * %false if it was already idle.
 */
bool flush_work(struct work_struct *work)
{
    struct wq_barrier barr;

    lock_map_acquire(&work->lockdep_map);
    lock_map_release(&work->lockdep_map);

    if (start_flush_work(work, &barr)) {
        wait_for_completion(&barr.done);
        destroy_work_on_stack(&barr.work);
        return true;
    } else {
        return false;
    }
}
EXPORT_SYMBOL_GPL(flush_work);

 

 3. 部分源碼解析

  直接看最核心部分:

 
 
NR_STD_WORKER_POOLS = 2

static
int __init init_workqueues(void) { int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; //線程兩種優先級: nice=0普通級; nice=-20高優先級 int i, cpu; /* make sure we have enough bits for OFFQ pool ID */ BUILD_BUG_ON((1LU << (BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT)) < WORK_CPU_END * NR_STD_WORKER_POOLS); WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); wq_numa_init(); /* initialize CPU pools */ for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; for_each_cpu_worker_pool(pool, cpu) {   ------------------- a BUG_ON(init_worker_pool(pool)); pool->cpu = cpu; cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); pool->attrs->nice = std_nice[i++]; pool->node = cpu_to_node(cpu); /* alloc pool ID */ mutex_lock(&wq_pool_mutex); BUG_ON(worker_pool_assign_id(pool)); mutex_unlock(&wq_pool_mutex); } } /* create the initial worker */ for_each_online_cpu(cpu) {             struct worker_pool *pool; for_each_cpu_worker_pool(pool, cpu) { pool->flags &= ~POOL_DISASSOCIATED; BUG_ON(create_and_start_worker(pool) < 0); ------------- b } } /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { struct workqueue_attrs *attrs; BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; unbound_std_wq_attrs[i] = attrs;      ------------------ c /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs;         ----------------- d } system_wq = alloc_workqueue("events", 0, 0);  ----------------- e system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0); system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0); BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq); return 0; } early_initcall(init_workqueues);

 

a.  for_each_cpu_worker_pool

  其相關代碼在:

#define for_each_cpu_worker_pool(pool, cpu)                \
    for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];        \
         (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
         (pool)++)



static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);

  從這裏能夠看出, 每一個CPU都有兩個私有結構體 struct worker_pool , 用變量cpu_worker_pools 表示, 而這兩個worker_pool最大區別就是nice賦值, 以及對worker_pool 編號

 

b. create_and_start_worker(pool)

  對online CPU 每一個worker_pool建立worker, 也即前面講到的工做線程:

create_and_start_worker()
    -> create_worker()
        -> worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);
    -> start_worker()
        -> wake_up_process(worker->task);
        
worker_thread:
    do {
        struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry);

        if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
            /* optimization path, not strictly necessary */ process_one_work(worker, work);
            if (unlikely(!list_empty(&worker->scheduled)))
                process_scheduled_works(worker);
        } else {
            move_linked_works(work, &worker->scheduled, NULL);
            process_scheduled_works(worker);
        }
    } while (keep_working(pool));
    
        -> process_one_work(worker, work)
            -> 
                worker->current_work = work;
                worker->current_func = work->func;
                worker->current_pwq = pwq;

                list_del_init(&work->entry);

                worker->current_func(work); //調用函數 如上面的work_test_func()

  每一個CPU都有兩個worker_pool(普通優先級和高優先級), 而後每一個worker_pool又建立一個worker(名稱格式爲worker cpiid / 線程號 【H】), 並掛載到worker_pool   --> idr_replace(&pool->worker_idr, worker, worker->id);  同時worker->pool也指向worker_pool

所以, 通過a、b後架構以下:

  worker.task裏經過 list_first_entry(&pool->worklist, struct work_struct, entry); 獲取每一個工做項 work_struct, 並調用用戶指定的函數current_func

c. unbound_std_wq_attrs

   這個變量是爲了後面建立新的線程所作的一部分初始化工做

d.  ordered_wq_attrs

   這個變量也是爲了後面建立新的線程所作的一部分初始化工做

  前面說過新版內核對workqueue和線程進行了剝離, 由內核控制線程的數量和屬性, 咱們只介紹了普通優先級和高優先級, 其實還有bound cpu和unbound cpu屬性, 即這個線程是跑在指定的CPU上仍是任意CPU, 前面介紹的因爲調用

DEFINE_PER_CPU_SHARED_ALIGNED, 天然都是跟CPU走了, 而unbound_std_wq_attrs和ordered_wq_attrs天然就是爲了後面建立任意CPU均可運行的線程而作的準備, 最終線程有四種類型, 指定CPU的普通線程、指定CPU的高優先級線程、任意CPU的普通線程、任意CPU的高優先級線程。
且任意CPU的線程一開始是沒有建立了(只是初始化unbound_std_wq_attrs和ordered_wq_attrs), 根據驅動建立workqueue和系統負載自行決定, 因此線程的數量不會像指定CPU那樣只有一個!, 最終相似以下:

 

  能夠看出unbound和ordered的worker_poll不會指定CPU, 同時worker_dir鏈表會掛載多個worker, 另外線程的名稱也有區別, 指定CPU就用所在CPU id表示, 不然用worker_pool的id表示:

      

   我奇怪爲什麼不使用一個unbound worker_pool, 其worker_idr掛載全部的worker就能夠了, 爲什麼每生成一個worker就要配套一個worker_pool, 若是你知道請留言告知 謝謝~

 e. alloc_workqueue

  前面說過, 新版的alloc_workqueue()只是建立workqueue這個空殼, 不會再建立本身「私有」的線程了, 有的是如何指向「合適」的線程, 何爲合適?  這取決用戶在調用alloc_workqueue()傳的參數, 用於告知要什麼屬性的線程

#define alloc_ordered_workqueue(fmt, flags, args...)            \
    alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)

#define create_workqueue(name)                        \
    alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_freezable_workqueue(name)                \
    alloc_workqueue((name), WQ_FREEZABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name)                \
    alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)

===========================================================================
#define alloc_workqueue(fmt, flags, max_active, args...)        \
    __alloc_workqueue_key((fmt), (flags), (max_active),        \
                  NULL, NULL, ##args)

  很顯然第一個參數表示workqueue的名稱, 在Linux-2.XXX也會做爲本身私有線程的線程名, 命令ps還能查看獲得。 第二個參數就是告知這個workqueue到時候(調用queue_work()時)要指定哪一個線程的依據, 後面參數就不解釋了

咱們進一步跟蹤函數__alloc_workqueue_key() 的實現(詳解在註釋):

struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
                           unsigned int flags,
                           int max_active,
                           struct lock_class_key *key,
                           const char *lock_name, ...)
{

    struct workqueue_struct *wq;
    struct pool_workqueue *pwq;

    /* 1. 建立workqueue_struct */
    wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);

    /* 2.1 建立pool_workqueue
     * 2.2 尋找worker_pool, 若是是bound那worker_pool已存在,直接找優先級
           若是是unbound,就建立unbound worker_pool
       2.3 若是是建立unbound worker_pool, 就順道建立worker
       2.4 將步驟1建立的workqueue_struct 和 步驟2.1建立的pool_workqueue 和 步驟2.2的 worker_pool 串起來
       2.5 同理ordered
     */
    if (alloc_and_link_pwqs(wq) < 0)
        goto err_free_wq;

    /* 3. 將 workqueue_struc 掛載到workqueues上 */
    list_add(&wq->list, &workqueues);
}


/* ========================== 最重要的是步驟2!  繼續跟蹤......============================= */
        static int alloc_and_link_pwqs(struct workqueue_struct *wq)
        {
            bool highpri = wq->flags & WQ_HIGHPRI;
            int cpu, ret;

            if (!(wq->flags & WQ_UNBOUND)) {
                /* 上面2.1 建立pool_workqueue */    
                wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
                if (!wq->cpu_pwqs)
                    return -ENOMEM;

                for_each_possible_cpu(cpu) {
                    struct pool_workqueue *pwq =
                        per_cpu_ptr(wq->cpu_pwqs, cpu);
                    /* 上面2.2 尋找worker_pool, 已存在的 */
                    struct worker_pool *cpu_pools =
                        per_cpu(cpu_worker_pools, cpu);

                    /* 上面2.4 串起來 */
                    init_pwq(pwq, wq, &cpu_pools[highpri]);

                    mutex_lock(&wq->mutex);
                    link_pwq(pwq);
                    mutex_unlock(&wq->mutex);
                }
                return 0;
            } else if (wq->flags & __WQ_ORDERED) {
                ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
            } else {
                /* 上面2.1234 由於是unbound, 因此要建立, 用到以前實現初始化變量unbound_std_wq_attrs */
                return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
            }
        }



/* ===================================== 繼續跟蹤 apply_workqueue_attrs ========================================= */ apply_workqueue_attrs()
                    -> alloc_unbound_pwq()        -------------2.1--------- 得到 pool_workqueue
                        -> get_unbound_pool()     -------------2.2--------- 得到 worker_pool
                            -> create_and_start_worker(pool) --2.3--------- 得到 worker

  總而言之, 調用alloc_workqueue()返回workqueue_struct後, 會依次串連起 workqueue_struct -> pool_workqueue -> worker_pool -> worker, 如圖:

  調個線程要透過這四個結構體大山確實蠻尷尬的, 但也是爲了兼容之前, 因此這個就是目前的現狀......

 

 4. 其餘

 除了上面介紹的四種線程屬性, 其實還有其餘的, 讀者能夠自行查看:

enum {
    WQ_NON_REENTRANT    = 1 << 0, /* guarantee non-reentrance */
    WQ_UNBOUND        = 1 << 1, /* not bound to any cpu */
    WQ_FREEZABLE        = 1 << 2, /* freeze during suspend */
    WQ_MEM_RECLAIM        = 1 << 3, /* may be used for memory reclaim */
    WQ_HIGHPRI        = 1 << 4, /* high priority */
    WQ_CPU_INTENSIVE    = 1 << 5, /* cpu instensive workqueue */
    WQ_SYSFS        = 1 << 6, /* visible in sysfs, see wq_sysfs_register() */

    /*
     * Per-cpu workqueues are generally preferred because they tend to
     * show better performance thanks to cache locality.  Per-cpu
     * workqueues exclude the scheduler from choosing the CPU to
     * execute the worker threads, which has an unfortunate side effect
     * of increasing power consumption.
     *
     * The scheduler considers a CPU idle if it doesn't have any task
     * to execute and tries to keep idle cores idle to conserve power;
     * however, for example, a per-cpu work item scheduled from an
     * interrupt handler on an idle CPU will force the scheduler to
     * excute the work item on that CPU breaking the idleness, which in
     * turn may lead to more scheduling choices which are sub-optimal
     * in terms of power consumption.
     *
     * Workqueues marked with WQ_POWER_EFFICIENT are per-cpu by default
     * but become unbound if workqueue.power_efficient kernel param is
     * specified.  Per-cpu workqueues which are identified to
     * contribute significantly to power-consumption are identified and
     * marked with this flag and enabling the power_efficient mode
     * leads to noticeable power saving at the cost of small
     * performance disadvantage.
     *
     * http://thread.gmane.org/gmane.linux.kernel/1480396
     */
    WQ_POWER_EFFICIENT    = 1 << 7,

    __WQ_DRAINING        = 1 << 16, /* internal: workqueue is draining */
    __WQ_ORDERED        = 1 << 17, /* internal: workqueue is ordered */

    WQ_MAX_ACTIVE        = 512,      /* I like 512, better ideas? */
    WQ_MAX_UNBOUND_PER_CPU    = 4,      /* 4 * #cpus for unbound wq */
    WQ_DFL_ACTIVE        = WQ_MAX_ACTIVE / 2,
};

 

  兼容之前API接口

#define alloc_ordered_workqueue(fmt, flags, args...)            \
    alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)

#define create_workqueue(name)                        \
    alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_freezable_workqueue(name)                \
    alloc_workqueue((name), WQ_FREEZABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name)                \
    alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
相關文章
相關標籤/搜索