1、前言node
本文主要講述下面兩部分的內容:函數
一、將work掛入workqueue的處理過程線程
二、如何處理掛入workqueue的workorm
2、用戶將一個work掛入workqueue接口
一、queue_work_on函數隊列
使用workqueue機制的模塊能夠調用queue_work_on(有其餘變種的接口,這裏略過,其實思路是一致的)將一個定義好的work掛入workqueue,具體代碼以下:進程
bool queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
…… 內存if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);---掛入work list並通知worker thread pool來處理
ret = true;
} get……
} it
work_struct的data member中的WORK_STRUCT_PENDING_BIT這個bit標識了該work是處於pending狀態仍是正在處理中,pending狀態的work只會掛入一次。大部分的邏輯都是在__queue_work函數中,下面的小節都是描述該函數的執行過程。
二、__WQ_DRAINING的解釋
__queue_work函數一開始會校驗__WQ_DRAINING這個flag,以下:
if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq)))
return;
__WQ_DRAINING這個flag表示該workqueue正在進行draining的操做,這多半是發送在銷燬workqueue的時候,既然要銷燬,那麼掛入該workqueue的全部的work都要處理完畢,才容許它消亡。當想要將一個workqueue中全部的work都清空的時候,若是還有work掛入怎麼辦?通常而言,這時候固然是不容許新的work掛入了,畢竟如今的目標是清空workqueue中的work。可是有一種特例(經過is_chained_work斷定),也就是正在清空的work(隸屬於該workqueue)又觸發了一個queue work的操做(也就是所謂chained work),這時候該work容許掛入。
三、選擇pool workqueue
if (req_cpu == WORK_CPU_UNBOUND)
cpu = raw_smp_processor_id();if (!(wq->flags & WQ_UNBOUND))
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
else
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
WORK_CPU_UNBOUND表示並不指定cpu,這時候,選擇當前代碼運行的那個cpu了。一旦肯定了cpu了,對於非unbound的workqueue,固然使用per cpu的pool workqueue。若是是unbound的workqueue,那麼要根據numa node id來選擇。cpu_to_node能夠從cpu id獲取node id。須要注意的是:這裏選擇的pool wq只是備選的,可能用也可能不用,它有可能會被替換掉,具體參考下一節描述。
四、選擇worker thread pool
與其說掛入workqueue,不如說掛入worker thread pool,由於畢竟是線程池來處理具體的work。pool_workqueue有一個相關聯的worker thread pool(struct pool_workqueue的pool成員),所以看起來選擇了pool wq也就選定了worker pool了,可是,不是當前選定的那個pool wq對應的worker pool就適合該work,由於有時候該work可能正在其餘的worker thread上執行中,在這種狀況下,爲了確保work的callback function不會重入,該work最好仍是掛在那個worker thread pool上,具體代碼以下:
last_pool = get_work_pool(work);
if (last_pool && last_pool != pwq->pool) {
struct worker *worker;spin_lock(&last_pool->lock);
worker = find_worker_executing_work(last_pool, work);
if (worker && worker->current_pwq->wq == wq) {
pwq = worker->current_pwq;
} else {
/* meh... not running there, queue here */
spin_unlock(&last_pool->lock);
spin_lock(&pwq->pool->lock);
}
} else {
spin_lock(&pwq->pool->lock);
}
last_pool記錄了上一次該work是被哪個worker pool處理的,若是last_pool就是pool wq對應的worker pool,那麼皆大歡喜,不然只能使用last pool了。使用last pool的例子比較複雜一些,由於這時候須要根據last worker pool找到對應的pool workqueue。find_worker_executing_work函數能夠找到具體哪個worker線程正在處理該work,若是沒有找到,那麼仍是使用第3節中選定的pool wq吧,不然,選擇該worker線程當前的那個pool workqueue(其實也就是選定了線程池)。
五、選擇work掛入的隊列
隊列有兩個,一個是被推遲執行的隊列(pwq->delayed_works),一個是線程池要處理的隊列(pwq->pool->worklist),若是掛入線程池要處理的隊列,也就意味着該work進入active狀態,線程池會馬上啓動處理流程,若是掛入推遲執行的隊列,那麼該work仍是pending狀態:
pwq->nr_in_flight[pwq->work_color]++;
work_flags = work_color_to_flags(pwq->work_color);if (likely(pwq->nr_active < pwq->max_active)) {
pwq->nr_active++;
worklist = &pwq->pool->worklist;
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist = &pwq->delayed_works;
}insert_work(pwq, work, worklist, work_flags);
具體的掛入隊列的動做是在insert_work函數中完成的。
六、喚醒idle的worker來處理該work
在insert_work函數中有下面的代碼:
if (__need_more_worker(pool))
wake_up_worker(pool);
當線程池中正在運行狀態的worker線程數目等於0的時候,說明須要wakeup線程池中處於idle狀態的的worker線程來處理work。
3、線程池如何建立worker線程?
一、per cpu worker pool何時建立worker線程?
對於per-CPU workqueue,每一個cpu有兩個線程池,一個是normal,一個是high priority的。在初始化函數init_workqueues中有對這兩個線程池的初始化:
for_each_online_cpu(cpu) {
struct worker_pool *pool;for_each_cpu_worker_pool(pool, cpu) {
pool->flags &= ~POOL_DISASSOCIATED;
BUG_ON(!create_worker(pool));
}
}
所以,在系統初始化的時候,per cpu workqueue共享的那些線程池(2 x cpu nr)就會經過create_worker建立一個initial worker。
一旦initial worker啓動,該線程會執行worker_thread函數來處理work,在處理過程當中,若是有須要, worker會建立新的線程。
二、unbound thread pool何時建立worker線程?
咱們先看看unbound thread pool的創建,和per-CPU不一樣的是unbound thread pool是全局共享的,所以,每當建立不一樣屬性的unbound workqueue的時候,都須要建立pool_workqueue及其對應的worker pool,這時候就會調用get_unbound_pool函數在當前系統中現存的線程池中找是否有匹配的worker pool,若是沒有就須要建立新的線程池。在建立新的線程池以後,會馬上調用create_worker建立一個initial worker。和per cpu worker pool同樣,一旦initial worker啓動,隨着work不斷的掛入以及worker處理work的具體狀況,線程池會動態建立worker。
三、如何建立worker。代碼以下:
static struct worker *create_worker(struct worker_pool *pool)
{
struct worker *worker = NULL;
int id = -1;
char id_buf[16];id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);----分配ID
worker = alloc_worker(pool->node);-----分配worker struct的內存
worker->pool = pool;
worker->id = id;if (pool->cpu >= 0)---------worker的名字
snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id, pool->attrs->nice < 0 ? "H" : "");
else
snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);
set_user_nice(worker->task, pool->attrs->nice); ---建立task並設定nice value
worker->task->flags |= PF_NO_SETAFFINITY;
worker_attach_to_pool(worker, pool); -----創建worker和線程池的關係spin_lock_irq(&pool->lock);
worker->pool->nr_workers++;
worker_enter_idle(worker);
wake_up_process(worker->task);------讓worker運行起來
spin_unlock_irq(&pool->lock);return worker;
}
代碼不復雜,經過線程池(struct worker_pool)綁定的cpu信息(struct worker_pool的cpu成員)能夠知道該pool是per-CPU仍是unbound,對於per-CPU線程池,pool->cpu是大於等於0的。對於對於per-CPU線程池,其worker線程的名字是kworker/cpu:worker id,若是是high priority的,後面還跟着一個H字符。對於unbound線程池,其worker線程的名字是kworker/u pool id:worker id。
4、work的處理
本章主要描述worker_thread函數的執行流程,部分代碼有刪節,保留主幹部分。
一、PF_WQ_WORKER標記
worker線程函數一開始就會經過PF_WQ_WORKER來標註本身:
worker->task->flags |= PF_WQ_WORKER;
有了這樣一個flag,調度器在調度當前進程sleep的時候能夠檢查這個準備sleep的進程是不是一個worker線程,若是是的話,那麼調度器不能魯莽的調度到其餘的進程,這時候,還須要找到該worker對應的線程池,喚醒一個idle的worker線程。經過workqueue模塊和調度器模塊的交互,當work A被阻塞後(處理該work的worker線程進入sleep),調度器會喚醒其餘的worker線程來處理其餘的work B,work C……
二、管理線程池中的線程
recheck:
if (!need_more_worker(pool))
goto sleep;if (unlikely(!may_start_working(pool)) && manage_workers(worker))
goto recheck;
如何判斷是否須要建立更多的worker線程呢?原則以下:
(1)有事情作:掛在worker pool中的work list不能是空的,若是是空的,那麼固然sleep就行了
(2)比較忙:worker pool的nr_running成員表示線程池中當前正在幹活(running狀態)的worker線程有多少個,當nr_running等於0表示全部的worker線程在處理work的時候阻塞了,這時候,必需要啓動新的worker線程來處理worker pool上處於active狀態的work鏈表上的work們。
三、worker線程開始處理work
worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do {
struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry);if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(pool));worker_set_flags(worker, WORKER_PREP);
按理說worker線程處理work應該比較簡單,從線程池的worklist中取一個work,而後調用process_one_work處理之就OK了,不過現實稍微複雜一些,work和work之間並非獨立的,也就是說,work A和work B多是linked work,這些linked work應該被一個worker來處理。WORK_STRUCT_LINKED標記了work是屬於linked work,若是是linked work,worker並不直接處理,而是將其掛入scheduled work list,而後調用process_scheduled_works來處理。毫無疑問,process_scheduled_works也是調用process_one_work來處理一個一個scheduled work list上的work。
scheduled work list並不是僅僅應用在linked work,在worker處理work的時候,有一個原則要保證:同一個work不能被同一個cpu上的多個worker同時執行。這時候,若是worker發現本身要處理的work正在被另一個worker線程處理,那麼本worker線程將不處理該work,只須要掛入正在執行該work的worker線程的scheduled work list便可。