Read the fucking source code!
--By 魯迅A picture is worth a thousand words.
--By 高爾基說明:html
Workqueue
工做隊列是利用內核線程來異步執行工做任務的通用機制;Workqueue
工做隊列能夠用做中斷處理的Bottom-half
機制,利用進程上下文來執行中斷處理中耗時的任務,所以它容許睡眠,而Softirq
和Tasklet
在處理任務時不能睡眠;來一張概述圖:node
workqueue
的調度或入隊接口後,經過創建好的連接關係圖逐級找到合適的worker
,最終完成工做任務的執行;此處應有圖:linux
work_struct
:工做隊列調度的最小單位,work item
;workqueue_struct
:工做隊列,work item
都掛入到工做隊列中;worker
:work item
的處理者,每一個worker
對應一個內核線程;worker_pool
:worker
池(內核線程池),是一個共享資源池,提供不一樣的worker
來對work item
進行處理;pool_workqueue
:充當橋樑紐帶的做用,用於鏈接workqueue
和worker_pool
,創建連接關係;下邊看看細節吧:api
struct work_struct
用來描述work
,初始化一個work
並添加到工做隊列後,將會將其傳遞到合適的內核線程來進行處理,它是用於調度的最小單位。緩存
關鍵字段描述以下:數據結構
struct work_struct { atomic_long_t data; //低比特存放狀態位,高比特存放worker_pool的ID或者pool_workqueue的指針 struct list_head entry; //用於添加到其餘隊列上 work_func_t func; //工做任務的處理函數,在內核線程中回調 #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif };
圖片說明下data
字段:併發
內核中工做隊列分爲兩種:異步
worker
建立的內核線程綁定到特定的CPU上運行;WQ_UNBOUND
標誌,內核線程能夠在處理器間遷移;內核默認建立了一些工做隊列(用戶也能夠建立):函數
system_mq
:若是work item
執行時間較短,使用本隊列,調用schedule[_delayed]_work[_on]()
接口就是添加到本隊列中;system_highpri_mq
:高優先級工做隊列,以nice值-20來運行;system_long_wq
:若是work item
執行時間較長,使用本隊列;system_unbound_wq
:該工做隊列的內核線程不綁定到特定的處理器上;system_freezable_wq
:該工做隊列用於在Suspend時可凍結的work item
;system_power_efficient_wq
:該工做隊列用於節能目的而選擇犧牲性能的work item
;system_freezable_power_efficient_wq
:該工做隊列用於節能或Suspend時可凍結目的的work item
;struct workqueue_struct
關鍵字段介紹以下:工具
struct workqueue_struct { struct list_head pwqs; /* WR: all pwqs of this wq */ //全部的pool_workqueue都添加到本鏈表中 struct list_head list; /* PR: list of all workqueues */ //用於將工做隊列添加到全局鏈表workqueues中 struct list_head maydays; /* MD: pwqs requesting rescue */ //rescue狀態下的pool_workqueue添加到本鏈表中 struct worker *rescuer; /* I: rescue worker */ //rescuer內核線程,用於處理內存緊張時建立工做線程失敗的狀況 struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */ char name[WQ_NAME_LEN]; /* I: workqueue name */ /* hot fields used during command issue, aligned to cacheline */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */ //Per-CPU都建立pool_workqueue struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */ //Per-Node建立pool_workqueue ... };
worker
對應一個內核線程,用於對work item
的處理;worker
根據工做狀態,能夠添加到worker_pool
的空閒鏈表或忙碌列表中;worker
處於空閒狀態時並接收到工做處理請求,將喚醒內核線程來處理;worker_pool
中由一個初始的空閒工做線程建立的,並根據須要動態建立和銷燬;關鍵字段描述以下:
struct worker { /* on idle list while idle, on busy hash table while busy */ union { struct list_head entry; /* L: while idle */ //用於添加到worker_pool的空閒鏈表中 struct hlist_node hentry; /* L: while busy */ //用於添加到worker_pool的忙碌列表中 }; struct work_struct *current_work; /* L: work being processed */ //當前正在處理的work work_func_t current_func; /* L: current_work's fn */ //當前正在執行的work回調函數 struct pool_workqueue *current_pwq; /* L: current_work's pwq */ //指向當前work所屬的pool_workqueue struct list_head scheduled; /* L: scheduled works */ //全部被調度執行的work都將添加到該鏈表中 /* 64 bytes boundary on 64bit, 32 on 32bit */ struct task_struct *task; /* I: worker task */ //指向內核線程 struct worker_pool *pool; /* I: the associated pool */ //該worker所屬的worker_pool /* L: for rescuers */ struct list_head node; /* A: anchored at pool->workers */ //添加到worker_pool->workers鏈表中 /* A: runs through worker->node */ ... };
worker_pool
是一個資源池,管理多個worker
,也就是管理多個內核線程;worker_pool
是Per-CPU建立,每一個CPU都有兩個worker_pool
,對應不一樣的優先級,nice值分別爲0和-20;worker_pool
建立後會添加到unbound_pool_hash
哈希表中;worker_pool
管理一個空閒鏈表和一個忙碌列表,其中忙碌列表由哈希管理;關鍵字段描述以下:
struct worker_pool { spinlock_t lock; /* the pool lock */ int cpu; /* I: the associated cpu */ //綁定到CPU的workqueue,表明CPU ID int node; /* I: the associated node ID */ //非綁定類型的workqueue,表明內存Node ID int id; /* I: pool ID */ unsigned int flags; /* X: flags */ unsigned long watchdog_ts; /* L: watchdog timestamp */ struct list_head worklist; /* L: list of pending works */ //pending狀態的work添加到本鏈表 int nr_workers; /* L: total number of workers */ //worker的數量 /* nr_idle includes the ones off idle_list for rebinding */ int nr_idle; /* L: currently idle ones */ struct list_head idle_list; /* X: list of idle workers */ //處於IDLE狀態的worker添加到本鏈表 struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for workers */ /* a workers is either on busy_hash or idle_list, or the manager */ DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER); //工做狀態的worker添加到本哈希表中 /* L: hash of busy workers */ /* see manage_workers() for details on the two manager mutexes */ struct worker *manager; /* L: purely informational */ struct mutex attach_mutex; /* attach/detach exclusion */ struct list_head workers; /* A: attached workers */ //worker_pool管理的worker添加到本鏈表中 struct completion *detach_completion; /* all workers detached */ struct ida worker_ida; /* worker IDs for task name */ struct workqueue_attrs *attrs; /* I: worker attributes */ struct hlist_node hash_node; /* PL: unbound_pool_hash node */ //用於添加到unbound_pool_hash中 ... } ____cacheline_aligned_in_smp;
pool_workqueue
充當紐帶的做用,用於將workqueue
和worker_pool
關聯起來;關鍵字段描述以下:
struct pool_workqueue { struct worker_pool *pool; /* I: the associated pool */ //指向worker_pool struct workqueue_struct *wq; /* I: the owning workqueue */ //指向所屬的workqueue int nr_active; /* L: nr of active works */ //活躍的work數量 int max_active; /* L: max active works */ //活躍的最大work數量 struct list_head delayed_works; /* L: delayed works */ //延遲執行的work掛入本鏈表 struct list_head pwqs_node; /* WR: node on wq->pwqs */ //用於添加到workqueue鏈表中 struct list_head mayday_node; /* MD: node on wq->maydays */ //用於添加到workqueue鏈表中 ... } __aligned(1 << WORK_STRUCT_FLAG_BITS);
再來張圖,首尾呼應一下:
workqueue
子系統的初始化分紅兩步來完成的:workqueue_init_early
和workqueue_init
。workqueue
子系統早期初始化函數完成的主要工做包括:
pool_workqueue
的SLAB緩存,用於動態分配struct pool_workqueue
結構;worker_pool
,其中的nice值分別爲0和HIGHPRI_NICE_LEVEL
,而且爲每一個worker_pool
從worker_pool_idr
中分配一個ID號;struct workqueue_attrs
屬性,主要描述內核線程的nice值,以及cpumask值,分別針對優先級以及容許在哪些CPU上執行;從圖中能夠看出建立工做隊列的接口爲:alloc_workqueue
,以下圖:
alloc_workqueue
完成的主要工做包括:
struct workqueue_struct
的數據結構,而且對該結構中的字段進行初始化操做;workqueue
最終須要和worker_pool
關聯起來,而這個紐帶就是pool_workqueue
,alloc_and_link_pwqs
函數就是完成這個功能:1)若是工做隊列是綁定到CPU上的,則爲每一個CPU都分配pool_workqueue
而且初始化,經過link_pwq
將工做隊列與pool_workqueue
創建鏈接;2)若是工做隊列不綁定到CPU上,則按內存節點(NUMA,參考以前內存管理的文章)來分配pool_workqueue
,調用get_unbound_pool
來實現,它會根據wq屬性先去查找,若是沒有找到相同的就建立一個新的pool_workqueue
,而且添加到unbound_pool_hash
哈希表中,最後也會調用link_pwq
來創建鏈接;WQ_MEM_RECLAIM
標誌,則會新建rescuer worker
,對應rescuer_thread
內核線程。當內存緊張時,新建立worker
可能會失敗,這時候由rescuer
來處理這種狀況;workqueues
中;workqueue
子系統第二階段的初始化:
worker_pool
,添加一個初始的worker
;create_worker
函數中,建立的內核線程名字爲kworker/XX:YY
或者kworker/uXX:YY
,其中XX
表示worker_pool
的編號,YY
表示worker
的編號,u
表示unbound
;workqueue
子系統初始化完成後,基本就已經將數據結構的關聯創建好了,當有work
來進行調度的時候,就能夠進行處理了。
以schedule_work
接口爲例進行分析:
schedule_work
默認是將work
添加到系統的system_work
工做隊列中;
queue_work_on
接口中的操做判斷要添加work
的標誌位,若是已經置位了WORK_STRUCT_PENDING_BIT
,代表已經添加到了隊列中等待執行了,不然,須要調用__queue_work
來進行添加。注意了,這個操做是在關中斷的狀況下進行的,由於工做隊列使用WORK_STRUCT_PENDING_BIT
位來同步work
的插入和刪除操做,設置了這個比特後,而後才能執行work
,這個過程可能被中斷或搶佔打斷;
workqueue
的標誌位設置了__WQ_DRAINING
,代表工做隊列正在銷燬,全部的work
都要處理完,此時不容許再將work
添加到隊列中,有一種特殊狀況:銷燬過程當中,執行work
時又觸發了新的work
,也就是所謂的chained work
;
判斷workqueue
的類型,若是是bound
類型,根據CPU來獲取pool_workqueue
,若是是unbound
類型,經過node號來獲取pool_workqueue
;
get_work_pool
獲取上一次執行work
的worker_pool
,若是本次執行的worker_pool
與上次執行的worker_pool
不一致,且經過find_worker_executing_work
判斷work
正在某個worker_pool
中的worker
中執行,考慮到緩存熱度,放到該worker
執行是更合理的選擇,進而根據該worker
獲取到pool_workqueue
;
判斷pool_workqueue
活躍的work
數量,少於最大限值則將work
加入到pool->worklist
中,不然加入到pwq->delayed_works
鏈表中,若是__need_more_worker
判斷沒有worker
在執行,則喚醒worker
內核線程執行;
總結:
schedule_work
完成的工做是將work
添加到對應的鏈表中,而在添加的過程當中,首先是須要肯定pool_workqueue
;pool_workqueue
對應一個worker_pool
,所以肯定了pool_workqueue
也就肯定了worker_pool
,進而能夠將work
添加到工做鏈表中;pool_workqueue
的肯定分爲三種狀況:1)bound
類型的工做隊列,直接根據CPU號獲取;2)unbound
類型的工做隊列,根據node號獲取,針對unbound
類型工做隊列,pool_workqueue
的釋放是異步執行的,須要判斷refcnt
的計數值,所以在獲取pool_workqueue
時可能要屢次retry
;3)根據緩存熱度,優先選擇正在被執行的worker_pool
;work
添加到工做隊列後,最終的執行在worker_thread
函數中:
在建立worker
時,建立內核線程,執行函數爲worker_thread
;
worker_thread
在開始執行時,設置標誌位PF_WQ_WORKER
,調度器在進行調度處理時會對task進行判斷,針對workerqueue worker
有特殊處理;
worker
對應的內核線程,在沒有處理work
的時候是睡眠狀態,當被喚醒的時候,跳轉到woke_up
開始執行;
woke_up
以後,若是此時worker
是須要銷燬的,那就進行清理工做並返回。不然,離開IDLE
狀態,並進入recheck
模塊執行;
recheck
部分,首先判斷是否須要更多的worker
來處理,若是沒有任務處理,跳轉到sleep
地方進行睡眠。有任務須要處理時,會判斷是否有空閒內核線程以及是否須要動態建立,再清除掉worker
的標誌位,而後遍歷工做鏈表,對鏈表中的每一個節點調用process_one_worker
來處理;
sleep
部分比較好理解,沒有任務處理時,worker
進入空閒狀態,並將當前的內核線程設置成睡眠狀態,讓出CPU;
總結:
worker_pool
的內核線程池時,若是有PENDING
狀態的work
,而且發現沒有正在運行的工做線程(worker_pool->nr_running == 0
),喚醒空閒狀態的內核線程,或者動態建立內核線程;work
已經在同一個worker_pool
的其餘worker
中執行,再也不對該work
進行處理;work
的執行函數爲process_one_worker
:
work
可能在同一個CPU上不一樣的worker
中運行,直接退出;worker->current_func()
,完成最終work
的回調函數執行;worker_pool
經過nr_running
字段來在不一樣的狀態機之間進行切換;worker_pool
中有work
須要處理時,須要至少保證有一個運行狀態的worker
,當nr_running
大於1時,將多餘的worker
進入IDLE狀態,沒有work
須要處理時,全部的worker
都會進入IDLE狀態;work
時,若是回調函數阻塞運行,那麼會讓worker
進入睡眠狀態,此時調度器會進行判斷是否須要喚醒另外一個worker
;worker
都存放在idle_list
鏈表中,若是空閒時間超過了300秒,則會將其進行銷燬;Running->Suspend
worker
進入睡眠狀態時,若是該worker_pool
沒有其餘的worker
處於運行狀態,那麼是須要喚醒一個空閒的worker
來維持併發處理的能力;Suspend->Running
wake_up_worker
來進行喚醒處理,最終判斷若是該worker
不在運行狀態,則增長worker_pool
的nr_running
值;worker_pool
初始化時,註冊了timer的回調函數,用於定時對空閒鏈表上的worker
進行處理,若是worker
太多,且空閒時間太長,超過了5分鐘,那麼就直接進行銷燬處理了;worker_thread
函數時,若是沒有空閒的worker
,會調用manage_workers
接口來建立更多的worker
來處理工做;Documentation/core-api/workqueue.rst
http://kernel.meizu.com/linux-workqueue.html
洗洗睡了,收工!
歡迎關注公衆號,不按期分享Linux內核機制文章