上一篇文章對工做隊列原理以及核心數據結構作了簡單介紹,本文重點介紹下workqueue的建立以及worker的管理。node
1、工做隊列的建立(__alloc_workqueue_key)數據結構
struct workqueue_struct *__alloc_workqueue_key(const char *fmt, unsigned int flags, int max_active, struct lock_class_key *key, const char *lock_name, ...) { size_t tbl_size = 0; va_list args; struct workqueue_struct *wq; struct pool_workqueue *pwq; /* allocate wq and format name */ if (flags & WQ_UNBOUND) tbl_size = wq_numa_tbl_len * sizeof(wq->numa_pwq_tbl[0]); /*分配workqueue_struct結構*/ wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); if (!wq) return NULL; if (flags & WQ_UNBOUND) { wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL); if (!wq->unbound_attrs) goto err_free_wq; } /*格式化名稱*/ va_start(args, lock_name); vsnprintf(wq->name, sizeof(wq->name), fmt, args); va_end(args); max_active = max_active ?: WQ_DFL_ACTIVE; max_active = wq_clamp_max_active(max_active, flags, wq->name); /* init wq */ wq->flags = flags; wq->saved_max_active = max_active; mutex_init(&wq->mutex); atomic_set(&wq->nr_pwqs_to_flush, 0); INIT_LIST_HEAD(&wq->pwqs); INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_overflow); INIT_LIST_HEAD(&wq->maydays); lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); INIT_LIST_HEAD(&wq->list); if (alloc_and_link_pwqs(wq) < 0) goto err_free_wq; /* * Workqueues which may be used during memory reclaim should * have a rescuer to guarantee forward progress. */ if (flags & WQ_MEM_RECLAIM) { struct worker *rescuer; rescuer = alloc_worker(); if (!rescuer) goto err_destroy; rescuer->rescue_wq = wq; rescuer->task = kthread_create(rescuer_thread, rescuer, "%s", wq->name); if (IS_ERR(rescuer->task)) { kfree(rescuer); goto err_destroy; } wq->rescuer = rescuer; rescuer->task->flags |= PF_NO_SETAFFINITY; wake_up_process(rescuer->task); } if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq)) goto err_destroy; /* * wq_pool_mutex protects global freeze state and workqueues list. * Grab it, adjust max_active and add the new @wq to workqueues * list. */ mutex_lock(&wq_pool_mutex); mutex_lock(&wq->mutex); for_each_pwq(pwq, wq) pwq_adjust_max_active(pwq); mutex_unlock(&wq->mutex); list_add(&wq->list, &workqueues); mutex_unlock(&wq_pool_mutex); return wq; err_free_wq: free_workqueue_attrs(wq->unbound_attrs); kfree(wq); return NULL; err_destroy: destroy_workqueue(wq); return NULL; }
該函數主要任務就是經過kzalloc分配一個workqueue_struct結構,而後格式化一個名稱,對workqueue進行簡單初始化;’接着就調用 和pwd創建關係。咱們暫且不考慮WQ_MEM_RECLAIM的狀況,那麼該函數主要就完成這兩個功能。全部的workqueue會連接成一個鏈表,鏈表頭是 一個全局靜態變量app
static LIST_HEAD(workqueues); /* PL: list of all workqueues */
本函數比較重要的就是和pwq創建關係了函數
static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; int cpu; /*若是是普通的work_queue*/ if (!(wq->flags & WQ_UNBOUND)) { /*爲每一個CPU 分配pool_workqueue--pwq*/ wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); if (!wq->cpu_pwqs) return -ENOMEM; /*把pwd和wq連接*/ for_each_possible_cpu(cpu) { struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu); init_pwq(pwq, wq, &cpu_pools[highpri]); mutex_lock(&wq->mutex); link_pwq(pwq); mutex_unlock(&wq->mutex); } return 0; } else { return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } }
這裏先知考慮普通的workqueue,不考慮WQ_UNBOUND。函數經過alloc_percpu爲workqueue分配了pool_workqueue變量,而後經過for_each_possible_cpu,對每一個CPU進行處理,實際上就是把對應的pool_workqueue和worker_pool經過init_pwq關聯起來。如上一篇文章所描述的,worker_pool分爲兩種:普通的和高優先級的。普通的爲第0項,而高優先級的爲第一項。創建關聯後在經過link_pwq把pwq接入wq的鏈表中。atom
2、worker的建立spa
在建立好workqueue和對應的pwq以及worker_pool後,須要顯示的爲worker_pool建立worker。核心函數爲create_and_start_worker線程
static int create_and_start_worker(struct worker_pool *pool) { struct worker *worker; mutex_lock(&pool->manager_mutex); /*建立一個屬於pool的worker,其實是建立一個線程*/ worker = create_worker(pool); if (worker) { spin_lock_irq(&pool->lock); /*啓動worker,即喚醒線程*/ start_worker(worker); spin_unlock_irq(&pool->lock); } mutex_unlock(&pool->manager_mutex); return worker ? 0 : -ENOMEM; }
注意這裏是針對worker_pool建立worker,因此worker_pool做爲參數傳遞進來,而具體執行建立任務的是create_worker函數,且因爲有專門的worker manager,故這裏給worker_pool增長worker須要加鎖。rest
create_worker函數其實也不復雜,核心任務主要包含如下幾個步驟:code
初始狀態下是爲每一個worker_pool建立一個worker。建立好以後經過start_worker啓動workerorm
static void start_worker(struct worker *worker) { worker->flags |= WORKER_STARTED; worker->pool->nr_workers++; worker_enter_idle(worker); wake_up_process(worker->task); }
該函數較簡單,首先就更新worker狀態爲WORKER_STARTED,增長pool中worker統計量;而後經過worker_enter_idle標記worker目前處於idle狀態;最後經過wake_up_process喚醒worker。咱們看下中間設置idle狀態的過程
static void worker_enter_idle(struct worker *worker) { struct worker_pool *pool = worker->pool; if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) || WARN_ON_ONCE(!list_empty(&worker->entry) && (worker->hentry.next || worker->hentry.pprev))) return; /* can't use worker_set_flags(), also called from start_worker() */ worker->flags |= WORKER_IDLE; pool->nr_idle++; worker->last_active = jiffies; /* idle_list is LIFO */ list_add(&worker->entry, &pool->idle_list); if (too_many_workers(pool) && !timer_pending(&pool->idle_timer)) mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT); /* * Sanity check nr_running. Because wq_unbind_fn() releases * pool->lock between setting %WORKER_UNBOUND and zapping * nr_running, the warning may trigger spuriously. Check iff * unbind is not in progress. */ WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && pool->nr_workers == pool->nr_idle && atomic_read(&pool->nr_running)); }
該函數會設置WORKER_IDLE,遞增pool的nr_idle計數,而後更新last_active爲當前jiffies。接着把worker掛入pool的idle_list的鏈表頭.默認狀態下,一個worker在idle狀態停留的最長時IDLE_WORKER_TIMEOUT,超過這個時間就要啓用管理工做。這裏的last_active即是紀錄進入idle狀態的時間,
3、worker的管理
系統中會根據實際對worker的須要,動態的增刪worker。針對idle worker,worker_pool中有個定時器idle_timer,其處理函數爲idle_worker_timeout,看下該處理函數
static void idle_worker_timeout(unsigned long __pool) { struct worker_pool *pool = (void *)__pool; spin_lock_irq(&pool->lock); if (too_many_workers(pool)) { struct worker *worker; unsigned long expires; /* idle_list is kept in LIFO order, check the last one ,即最早掛入鏈表的*/ worker = list_entry(pool->idle_list.prev, struct worker, entry); expires = worker->last_active + IDLE_WORKER_TIMEOUT; /*idleworker每次最多保持idle狀態IDLE_WORKER_TIMEOU,當定時器到期時進行檢查,若是還未到最長時間,則延遲定時器,不然 *對pool設置管理標誌,喚醒線程進行管理 */ if (time_before(jiffies, expires)) mod_timer(&pool->idle_timer, expires);//重置到期時間 else { /* it's been idle for too long, wake up manager */ pool->flags |= POOL_MANAGE_WORKERS; wake_up_worker(pool); } } spin_unlock_irq(&pool->lock); }
該函數主要是針對系統中出現太多worker的狀況進行處理,如何斷定worker太多呢?too_many_workers去完成,具體就是 nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy決定,其中MAX_IDLE_WORKERS_RATIO爲4。當的確idle worker太多了的時候,取最早掛入idle鏈表中的worker,斷定其處於idle狀態的時間是否超時,即超過IDLE_WORKER_TIMEOUT,若是沒有超時,則經過mod_timer修改定時器到期時間爲該定時器對應的最長idle時間,不然設置pool的POOL_MANAGE_WORKERS狀態,喚醒pool中的first worker執行管理工做。在worker的處理函數worker_thread中,經過need_more_worker判斷當前是否須要更多的worker,若是不須要,則直接goto到sleep
sleep: if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker)) goto recheck;
need_to_manage_workers就是判斷POOL_MANAGE_WORKERS,若是設置了該標誌則返回真。 管理worker的核心在manage_workers,其中只有兩個關鍵函數
ret |= maybe_destroy_workers(pool);
ret |= maybe_create_worker(pool);
咱們只看maybe_destroy_workers
static bool maybe_destroy_workers(struct worker_pool *pool) { bool ret = false; while (too_many_workers(pool)) { struct worker *worker; unsigned long expires; worker = list_entry(pool->idle_list.prev, struct worker, entry); expires = worker->last_active + IDLE_WORKER_TIMEOUT; if (time_before(jiffies, expires)) { mod_timer(&pool->idle_timer, expires); break; } /*刪除最早掛入鏈表的worker*/ destroy_worker(worker); ret = true; } return ret; }
該函數中會在此經過too_many_workers判斷是否有太多worker,若是是,則再次取最後一個worker,檢查idle時間,若是沒有超時,則修改定時器到期時間,不然經過destroy_worker銷燬worker。爲何要這樣判斷呢?經過對代碼的分析,我感受manage_work不只負責刪除,還負責增長worker,定時器主要是針對idle worker即目的是銷燬多餘的worker,可是執行管理任務的工做集成到了worker_thread中,所以就worker_thread而言,有可能須要增長、有可能須要刪除、還有可能不須要管理。所以這裏須要再次判斷。
4、work的添加
static inline bool schedule_work(struct work_struct *work) { return queue_work(system_wq, work); } static inline bool queue_work(struct workqueue_struct *wq, struct work_struct *work) { return queue_work_on(WORK_CPU_UNBOUND, wq, work); } bool queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) { bool ret = false; unsigned long flags; local_irq_save(flags); if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { __queue_work(cpu, wq, work); ret = true; } local_irq_restore(flags); return ret; }
所以主體就是__queue_work,其中一個核心工做就是調用了insert_work
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work, struct list_head *head, unsigned int extra_flags) { struct worker_pool *pool = pwq->pool; /* we own @work, set data and link */ set_work_pwq(work, pwq, extra_flags); list_add_tail(&work->entry, head); get_pwq(pwq); /* * Ensure either wq_worker_sleeping() sees the above * list_add_tail() or we see zero nr_running to avoid workers lying * around lazily while there are works to be processed. */ smp_mb(); /*若是須要更多,則喚醒,主要是判斷當前是否有正在運行的worker*/ if (__need_more_worker(pool)) wake_up_worker(pool); }
函數首先調用set_work_pwq把pwd寫入到work的data字段,而後把work加入到worker_pool維護的work鏈表中,在最後判斷如今是否須要更多worker,若是須要,則執行喚醒操做。固然是針對當前worker_pool,且喚醒的是worker_pool的第一個worker。其實在queue_work中,爲避免work重入,在選定worker_pool的時候會判斷該work是否仍在其餘worker_pool上運行,若是是,就把該work掛入對應worker_pool的work_list上,
以馬內利
參考資料:
LInux 3.10.1源碼