配套的代碼能夠從本號的github下載,github.com/shuningzhan… 文本有些圖片來自網絡,在此表示感謝,若有侵權請聯繫刪除。linux
工做隊列是一種將工做交給其它線程執行的機制。也就是當線程A指望作某件事,但本身由不想作,或者不能作的狀況下,它能夠將該事情(工做 work)加入到一個隊列當中,而後有後臺線程會從隊列中獲取該工做,並執行該工做。 這裏的的其它線程能夠本身建立,也能夠不用本身建立。由於,在操做系統起來的時候在每一個CPU上都建立了一組工做線程,並建立了默認工做隊列。如圖經過ps命令能夠看到內核建立的工做線程。 git
因爲Linux內核默認爲咱們作了不少工做,所以在常規狀況下工做隊列的使用很是簡單。咱們這裏先看一下最簡單狀況下如何使用工做隊列機制。本文的介紹從4個方面進行,分別以下:github
在具體使用以前,咱們先了解一下提供給咱們的接口有那些。首先咱們看一下涉及到的數據結構,瞭解了數據結構,才能比較容易的理解如何使用工做隊列。從使用層面上來講,咱們主要關注以下數據結構,這個數據結構表明一項工做。bash
typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
atomic_long_t data; /* 內核內部使用 */
struct list_head entry; /* 用於連接到工做隊列中*/
work_func_t func; /* 工做函數*/
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
複製代碼
這裏有一點須要說明的是在結構體中有一個函數指針成員,這個是執行具體的工做的實現。Linux內核將工做隊列設計爲一個通用的機制。網絡
工做隊列是應用很靈活,咱們能夠定義本身的工做隊列,或者使用操做系統內核預約義的工做隊列。這裏咱們先給出一個最簡單的工做隊列的實例,這個實例借用內核預約義的工做隊列。在這個示例中,咱們啓動了一個線程,而後定時將工做放入工做隊列中。工做隊列接收到任務後執行該任務。任務的具體內容也很簡單,只是打印一個消息。數據結構
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/mm.h>
#include <linux/in.h>
#include <linux/inet.h>
#include <linux/socket.h>
#include <net/sock.h>
#include <linux/kthread.h>
#include <linux/sched.h>
#include <linux/workqueue.h>
#define BUF_SIZE 1024
struct task_struct *main_task;
/* 定義本身的工做結構體,用於描述工做,
* 這裏須要包含系統提供的work_struct結構體
* 做爲其第一個成員。 */
struct my_work {
struct work_struct w;
int data;
};
/* 實例化咱們須要作的工做 */
static struct my_work real_work;
static inline void sleep(unsigned sec)
{
__set_current_state(TASK_INTERRUPTIBLE);
schedule_timeout(sec * HZ);
}
/* 工做函數,上述工做的具體工做由該函數完成,這裏
* 只是一個簡單的示例,僅僅打印一行文本 ,實際上
* 能夠作不少事情。*/
static void my_work_func(struct work_struct *work)
{
struct my_work *pwork;
/* 這裏使用了一個系統函數,用於根據成員的指針
* 得到父結構體的指針。 */
pwork = container_of(work, struct my_work, w);
printk(KERN_NOTICE "Do something %d\n", pwork->data);
}
/* 做爲獨立的線程,每隔1秒對工做數據進行調整,並加入
* 到工做隊列中。 */
static int multhread_server(void *data)
{
int index = 0;
/* 初始化一個工做,關鍵是初始化該工做的執行函數 */
INIT_WORK(&real_work.w, my_work_func);
while (!kthread_should_stop()) {
printk(KERN_NOTICE "server run %d\n", index);
real_work.data = index;
/* 調度工做,本質是將工做放入工做隊列當中。 */
if (schedule_work(&real_work.w) == 0) {
printk(KERN_NOTICE "Schedule work failed!\n");
}
index ++;
sleep(1);
}
return 0;
}
static int multhread_init(void)
{
ssize_t ret = 0;
printk("Hello, workqueue \n");
main_task = kthread_run(multhread_server,
NULL,
"multhread_server");
if (IS_ERR(main_task)) {
ret = PTR_ERR(main_task);
goto failed;
}
failed:
return ret;
}
static void multhread_exit(void)
{
printk("Bye!\n");
kthread_stop(main_task);
}
module_init(multhread_init);
module_exit(multhread_exit);
MODULE_LICENSE("GPL");
MODULE_AUTHOR("SunnyZhang<shuningzhang@126.com>");
複製代碼
下面這張圖是借用的魅族內核團隊博客的,這張圖很是清晰的解釋了工做隊列的架構和數據走向。 架構
在這裏咱們先解釋一下這張圖中比較重要的幾個概念: work :工做,這個就是具體要作的事情,經過上文中介紹的結構體表示。 workqueue :工做的集合。workqueue 和 work 是一對多的關係。 worker :工人。在代碼中 worker 對應一個 work_thread() 內核線程。 worker_pool:工人的集合。worker_pool 和 worker 是一對多的關係。 pwq(pool_workqueue):中間人 / 中介,負責創建起 workqueue 和 worker_pool 之間的關係。workqueue 和 pwq 是一對多的關係,pwq 和 worker_pool 是一對一的關係。實際上咱們能夠將工做隊列理解爲一個計算集羣,工做就是任務,咱們將工做提交給工做隊列至關於將任務提交給集羣。工做隊列再將任務根據負載狀況分配給具體的工人執行(工做隊列線程)。socket
前面介紹的工做隊列是借用的內核預建立的線程池,這個是全部人公用的。若是在負載較大的狀況下可能會影響任務執行的效率。內核提供了另外的加強功能,用戶能夠本身建立線程池,這樣就能夠用獨立的線程池處理任務,從而保證任務執行效率。下面這個函數用來建立一個函數
#define create_workqueue(name) \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name) \
alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
複製代碼
這兩個宏都會返回一個workqueue_struct結構體的指針,而且都會建立進程(「內核線程」)來執行加入到這個workqueue的work。 create_workqueue:多核CPU,這個宏,會在每一個CPU上建立一個專用線程。 create_singlethread_workqueue:單核仍是多核,都只在其中一個CPU上建立線程。 核心實如今函數alloc_workqueue和alloc_ordered_workqueue中,咱們之前者爲例進行介紹。該函數也是一個宏定義,具體定義以下,這裏並無作實質性的工做,是另一個宏定義。學習
#ifdef CONFIG_LOCKDEP
#define alloc_workqueue(fmt, flags, max_active, args...) \
({ \
static struct lock_class_key __key; \
const char *__lock_name; \
\
__lock_name = #fmt#args; \
\
__alloc_workqueue_key((fmt), (flags), (max_active), \
&__key, __lock_name, ##args); \
})
#else
#define alloc_workqueue(fmt, flags, max_active, args...) \
、、
__alloc_workqueue_key((fmt), (flags), (max_active), \
NULL, NULL, ##args)
#endif
複製代碼
咱們在進一步看__alloc_workqueue_key函數的定義,這裏刪除了其它冗餘的內容,從函數定義能夠看出這裏主要是建立工做隊列結構體和啓動了獨立的線程。該函數最終返回建立的workqueue_struct
結構體,然後面就能夠向該隊列發送工做了。
struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name, ...)
{
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
... ...
if (flags & WQ_MEM_RECLAIM) {
struct worker *rescuer;
rescuer = alloc_worker(NUMA_NO_NODE);
if (!rescuer)
goto err_destroy;
rescuer->rescue_wq = wq;
/*其實這個核心就是建立一個獨立的線程*/
rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
wq->name);
if (IS_ERR(rescuer->task)) {
kfree(rescuer);
goto err_destroy;
}
wq->rescuer = rescuer;
rescuer->task->flags |= PF_NO_SETAFFINITY;
wake_up_process(rescuer->task);
}
... ...
}
複製代碼
發送工做的函數定義以下,能夠看出來這裏有2個參數,分別是目的工做隊列和但願完成的工做。
bool queue_work(struct workqueue_struct *wq,struct work_struct *work);
bool queue_delayed_work(struct workqueue_struct *wq,
struct delayed_work *dwork,
unsigned long delay);
複製代碼
最後咱們把工做隊列涉及到的接口貼到下面,方便你們學習查找。