下半部和下半部執行的工做--工做隊列

工做隊列(work queue)是另一種將工做推後執行的形式.他和其餘形式都不相同.工做隊列能夠把工做推後,交由一個內核線程去執行,這個下半部分老是會在進程上下文中去執行.這樣,經過工做隊列執行的代碼能佔盡進程上下文的全部優點.最重要的是工做隊列能容許從新調度甚至是休眠.css

一般,在工做隊列和軟中斷/tasklet中作出選擇很是容易.若是推後執行的任務須要睡眠,那麼就選擇工做隊列,若是不須要睡眠,就選擇軟中斷或者是tasklet.實際上,工做隊列可使用內核線程特換,可是使用內核線程可能會出現一些問題,因此儘可能使用工做隊列.linux

若是須要一個能夠從新調度的實體來執行你的下半部處理,你應該使用工做隊列,他是惟一能在進程上下文中運行的下半部實現機制,也只有他才能夠睡眠.若是不須要用一個內核線程來推後執行工做,那麼就考慮使用tasklet.數組

(一):工做隊列的實現安全

工做隊列子系統是一個用於建立內核線程的接口.經過他建立的進程負責執行由內核其餘部分排到隊列裏的任務.他建立的這些內核線程稱爲工做者線程.工做隊隊列可讓你的驅動程序建立一個專門的工做者線程來處理須要推後的工做.不過,工做隊列子系統提供了一個缺省的工做者線程來處理這些工做.所以,工做隊列最基本的表現形式,就轉變成了一個把須要推後執行的任務交給特定的通用線程的這樣一種接口.markdown

缺省的工做者線程叫作events/n,這裏n是處理器編號,每個處理器對應一個線程.例如,單處理器的系統只有events/0這樣一個線程,而雙處理器系統就會多一個events/1線程.缺省的工做者線程會從多個地方獲得被推後的工做.許多內核驅動程序都把下半部交給缺省的工做者線程去作.除非一個驅動程序或者是子系統必須創建一個屬於他本身的內核線程,不然最好使用缺省線程.數據結構

1:表示線程的數據結構異步

工做者線程使用workqueue_struct結構表示:ide

/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: * * 外部可見的工做隊列抽象是每一個CPU工做隊列組成的數組 */
struct workqueue_struct {
    struct cpu_workqueue_struct *cpu_wq;
    struct list_head list;
    const char *name;
    int singlethread;
    int freezeable;     /* Freeze threads during suspend */
    int rt;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

該結構內部是一個由cpu_workqueue_struct結構組成的數組,它定義在kernel/workqueue.c中,數組中的每一項對應系統中的每個處理器.因爲系統中每一個處理器對應一個工做者線程,因此對於給定的某臺計算機來講,就是每一個處理器,每一個工做者線程對應一個這樣的cpu_workqueue_struct結構體.cpu_qorkqueue_struct是kernel/qorkqueue.c中的核心數據結構.函數

/* * The per-CPU workqueue (if single thread, we always use the first * possible cpu). */
struct cpu_workqueue_struct {
    spinlock_t lock; //鎖保護這種結構
    struct list_head worklist;  //工做列表
    wait_queue_head_t more_work;
    struct work_struct *current_work;  
    struct workqueue_struct *wq; //關聯工做隊列結構
    struct task_struct *thread;   //關聯線程
} ____cacheline_aligned;

注意,每一個工做者線程類型關聯一個本身的workqueue_struct.在該結構體裏面,給每一個線程分配一個cpu+qorkqueue_struct,於是也就是給每一個處理器分配一個.由於每一個處理器都有一個該類型的工做者線程.ui

2:表示工做的數據結構

全部的工做者線程都是用普通的內核線程實現的,他們都要執行worker_thread()函數.在他執行完之後,這個函數執行一個死循環並開始休眠.當有工做被插入到隊列裏面的時候,線程就會被喚醒,以便執行這些操做.當沒有剩餘的操做的時候,他又會繼續休眠.

工做用linux/workqueue.h中定義的work_struct結構體表示:

struct work_struct {
    atomic_long_t data;
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
#define WORK_STRUCT_STATIC 1 /* static initializer (debugobjects) */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
    struct list_head entry;
    work_func_t func;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

這些結構體被鏈接成鏈表,在每一個處理器上的每種類型的隊列都對應這樣一個鏈表.好比,每一個處理器上用於執行被推後的工做的那個通用線程就有這樣的一個鏈表.當一個工做者線程被喚醒的時候,他會執行他的鏈表上的全部工做,工做被執行完畢,他就將相應的work_struct對象從鏈表中移走.當鏈表上不在有對象的時候,他就會繼續休眠.

咱們看一下worker_thread的執行流程,以下:

static int worker_thread(void *__cwq)
{
    struct cpu_workqueue_struct *cwq = __cwq;
    DEFINE_WAIT(wait);
    if (cwq->wq->freezeable)
        set_freezable();
    for (;;) {
        prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
        if (!freezing(current) &&
            !kthread_should_stop() &&
            list_empty(&cwq->worklist))
            schedule();
        finish_wait(&cwq->more_work, &wait);
        try_to_freeze();
        if (kthread_should_stop())
            break;
        run_workqueue(cwq);
    }
    return 0;
}

該函數在死循環中完成了如下功能:

1:線程將本身設置爲休眠狀態(state被設成TASK_INTERRUPTIBLE),並把本身加入到等待隊列中

2:若是工做鏈是空的,則線程調用schedule()函數進入休眠狀態

3:若是鏈表中有對象,線程不會睡眠.相反,他將本身設置成
TAKS_RUNNING,脫離等待隊列.

4:若是鏈表非空,調用run_workqueue()函數執行被推後的工做.

下一步,由run_workqueu()函數來完成推後執行的工做:

static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
    spin_lock_irq(&cwq->lock);
    while (!list_empty(&cwq->worklist)) {
        struct work_struct *work = list_entry(cwq->worklist.next,
                        struct work_struct, entry);
        work_func_t f = work->func;
#ifdef CONFIG_LOCKDEP
        /*
         * It is permissible to free the struct work_struct
         * from inside the function that is called from it,
         * this we need to take into account for lockdep too.
         * To avoid bogus "held lock freed" warnings as well
         * as problems when looking into work->lockdep_map,
         * make a copy and use that here.
         */
        struct lockdep_map lockdep_map = work->lockdep_map;
#endif
        trace_workqueue_execution(cwq->thread, work);
        debug_work_deactivate(work);
        cwq->current_work = work;
        list_del_init(cwq->worklist.next);
        spin_unlock_irq(&cwq->lock);
        BUG_ON(get_wq_data(work) != cwq);
        work_clear_pending(work);
        lock_map_acquire(&cwq->wq->lockdep_map);
        lock_map_acquire(&lockdep_map);
        f(work);
        lock_map_release(&lockdep_map);
        lock_map_release(&cwq->wq->lockdep_map);
        if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
            printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
                    "%s/0x%08x/%d\n",
                    current->comm, preempt_count(),
                        task_pid_nr(current));
            printk(KERN_ERR " last function: ");
            print_symbol("%s\n", (unsigned long)f);
            debug_show_held_locks(current);
            dump_stack();
        }
        spin_lock_irq(&cwq->lock);
        cwq->current_work = NULL;
    }
    spin_unlock_irq(&cwq->lock);
}

該函數循環遍歷連表上每一個待處理的工做,執行鏈表每一個節點上的workqueue_struct中的func成員函數:
1:當鏈表不爲空的時候,選取下一個鏈表對象
2:獲取咱們但願執行的函數func及其參數data
3:把該節點從鏈表上解下來,將待處理標誌位pending清零
4:調用函數
5:重複執行

3:工做隊列實現工做的總結

下面一個圖展現了這些數據結構之間的關係:

這裏寫圖片描述

位於最高一層的是工做者線程.系統中容許有多種類型的工做者線程存在.對於指定的一個類型,系統上每一個CPU上都有一個該類的工做者線程.內核中有些部分能夠根據須要來建立工做者線程,而在默認狀況下,內核只有event這一種類型的工做者線程.每個工做者線程都由一個cpu_workqueue_struct結構體表示.而workqueue_struct結構體則表示給定類型的全部工做者線程.

例如,除系統默認的通用events工做者類型以外,咱們本身加了一種falcon工做者類型,而且使用的是一個擁有四個處理器的計算機.那麼,系統中如今有四個events類型的線程(於是也有四個cpu_workqueue_struct結構體)和另外四個falcon類型的線程(另外還有四個cpu_qorkqueue_struct結構體).同時,有一個對應的event類型的workqueue_struct和一個對應的falcon類型的workqueue_struct.

你的驅動程序建立這些須要推後執行的工做.他們用work_struct來表示.這個結構體中最重要的是一個指針,他指向一個函數,而正是該函數負責處理須要推後執行的具體任務.工做會被提交給某個具體的工做者線程.

默認狀況下,如今大部分驅動程序都是使用的默認工做者線程.

(二):使用工做隊列

首先咱們先看一下如何使用缺省的工做隊列.

1:建立推後的工做

首先須要作的就是實際建立一些須要推後完成的工做.能夠經過DECLARE_WORK在編譯的時候靜態的建立該結構體:

DECLARE_WORK(name,void(*func)(void *),void *data);

這樣就會靜態的建立一個名爲name,處理函數爲func,參數爲data的work_struct結構體.一樣,也能夠在運行的時候經過指針建立一個工做.

INIT_WORK(struct work_struct *work,void(*func)(void *),void *data);

這會動態的初始化一個由work指向的工做,處理函數爲func,參數爲data.

2:工做隊列處理函數

工做隊列處理函數的原型爲:

void work_handler(void *data);

這個函數會由一個工做者線程執行,所以,函數會運行在進程上下文中.默認狀況下,容許相應中斷,而且不持有任何鎖.若是須要,函數能夠睡眠.須要注意的是,儘管操做處理函數運行在進程上下文中,但他不能訪問用戶空間,由於內核線程在用戶空間沒有相關的內存映射.一般在發生系統調用的時候,內核會表明用戶空間的進程運行,此時才能訪問用戶空間.也只有此時他纔會映射用戶空間的內存.

在工做隊列和內核其餘部分之間使用鎖機制就像在其餘的進程上下文中使用鎖機制同樣方便.

3:對工做進行調度

如今工做已經被建立,咱們能夠調度他了.想要把給定工做的處理函數提交給缺省的events工做線程,只需調用:

schedule_work(&work);

work立刻就會被調度,一旦其所在的工做者線程被喚醒,他就會被執行.

若是想要他等待必定的時間以後再運行,那麼就能夠調度他在指定時間執行:

schedule_delayed_work(&work,delay);

此時,&work指向的work_struct直到delayz指定的時鐘節拍用完後纔會執行.

4:刷新操做

排入隊列的工做會在工做者線程下一次被喚醒的時候執行.有時,在繼續下一步工做以前,你必須保證一些操做已經執行完畢了.這一點對於模塊來講就很重要,在卸載以前,他就有可能須要調用下面的函數.而在內核的其餘部分,爲了防止競爭條件的出現,也可能須要確保再也不有待處理的工做.

出於以上目的,內核準備了一個用於刷新指定工做隊列的函數:

void flush_scheduled_work(void);

這個函數能夠取消任何與work_struct相關的掛起工做.

5: 建立新的工做隊列

若是缺省的隊列不能知足你的要求,你應該建立一個新的工做隊列和與之相應的工做者線程.建立一個新的任務隊列和與之相關的工做者線程,你只須要調用一個簡單的函數:

struct workqueue_struct *create_workqueue(const char *name);

name參數用於該內核線程的命名.好比缺省的events隊列的建立就調用的是:

truct workqueue_struct *keventd_wq;
kevent_wq = create_workqueue("events");

這樣就會建立全部的工做者線程(系統的每一個處理器都會有一個),而且作好全部開始處理工做以前的準備工做.

建立一個新的工做的時候,無需考慮工做隊列的類型.在建立以後,能夠調用下面列舉的函數.這些函數於schedule_work()和schedule_delayed_work()相近,惟一的區別就在於他們針對給定的工做隊列而不是缺省的events隊列進行操做.

int queue_work(struct workqueue_struct *wq,struct work_struct *work)
int queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work,unsigned long delay)

最後,調用下面的函數刷新指定的工做隊列.

flush_workqueue(struct workqueue_struct *wq);

該函數與flush_scheduled_work()做用相同,只是他在返回前等待清空的是給定的隊列.

5:下半部機制的選擇

下面是三種下半部接口的比較:

這裏寫圖片描述

對於通常的驅動開發來講,能夠考慮推後執行的工做,需不須要休眠,若是須要休眠,則使用工做隊列;若是不須要,最好使用tasklet.

6:在下半部之間加鎖

使用tasklet的一個好處在於,他本身負責執行的序列化保障:兩個相同類型的tasklet不容許同時執行,即便在不一樣的處理器上也不行.tasklet之間的同步(也就是兩個不一樣類型的tasklet
共享同一數據的時候)須要正確使用鎖機制.

若是一個進程上下文和一個下半部共享數據,在訪問這些數據以前,你須要禁止下半部的處理並獲得鎖的使用權.作這些是爲了本地和SMP的保護,而且防止死鎖的出現.

若是一箇中斷上下文和一個下半部共享數據,在訪問數據以前,你須要禁止中斷並獲得鎖的使用權.所作的這些也是爲了本地和SMP的保護而且防止死鎖的出現.

全部在工做隊列中被共享的數據也須要使用鎖機制.

7:禁止下半部

通常單純禁止下半部的處理是不夠的.爲了保證共享數據的安全,更常見的作法是,先獲得一個鎖而後再禁止下半部的處理.驅動程序中一般使用的都是這種方法.

若是須要禁止全部的下半部處理(就是全部的軟中斷和全部的tasklet),能夠調用local_bh_disable()函數.容許下半部進行處理,能夠調用local_bh_enable()函數.
函數經過preempt_count爲每一個進程維護一個計數器,當計數爲0的時候,下半部才能被處理.由於下半部的處理已經被禁止,因此local_bh_enable()還須要檢查全部
現存的待處理的下半部並執行他們.

這些函數並不能禁止工做隊列的執行,由於工做隊列是在進程上下文中運行的.不會涉及異步執行的問題,因此也就沒有必要禁止他們執行.

相關文章
相關標籤/搜索