聊一聊Linux中的工做隊列2

上一篇文章對工做隊列原理以及核心數據結構作了簡單介紹,本文重點介紹下workqueue的建立以及worker的管理。node


 1、工做隊列的建立(__alloc_workqueue_key)數據結構

struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
                           unsigned int flags,
                           int max_active,
                           struct lock_class_key *key,
                           const char *lock_name, ...)
{
    size_t tbl_size = 0;
    va_list args;
    struct workqueue_struct *wq;
    struct pool_workqueue *pwq;

    /* allocate wq and format name */
    if (flags & WQ_UNBOUND)
        tbl_size = wq_numa_tbl_len * sizeof(wq->numa_pwq_tbl[0]);
    /*分配workqueue_struct結構*/
    wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
    if (!wq)
        return NULL;

    if (flags & WQ_UNBOUND) {
        wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
        if (!wq->unbound_attrs)
            goto err_free_wq;
    }
    /*格式化名稱*/
    va_start(args, lock_name);
    vsnprintf(wq->name, sizeof(wq->name), fmt, args);
    va_end(args);

    max_active = max_active ?: WQ_DFL_ACTIVE;
    max_active = wq_clamp_max_active(max_active, flags, wq->name);

    /* init wq */
    wq->flags = flags;
    wq->saved_max_active = max_active;
    mutex_init(&wq->mutex);
    atomic_set(&wq->nr_pwqs_to_flush, 0);
    INIT_LIST_HEAD(&wq->pwqs);
    INIT_LIST_HEAD(&wq->flusher_queue);
    INIT_LIST_HEAD(&wq->flusher_overflow);
    INIT_LIST_HEAD(&wq->maydays);

    lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
    INIT_LIST_HEAD(&wq->list);
    if (alloc_and_link_pwqs(wq) < 0)
        goto err_free_wq;
    /*
     * Workqueues which may be used during memory reclaim should
     * have a rescuer to guarantee forward progress.
     */
    if (flags & WQ_MEM_RECLAIM) {
        struct worker *rescuer;

        rescuer = alloc_worker();
        if (!rescuer)
            goto err_destroy;

        rescuer->rescue_wq = wq;
        rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
                           wq->name);
        if (IS_ERR(rescuer->task)) {
            kfree(rescuer);
            goto err_destroy;
        }

        wq->rescuer = rescuer;
        rescuer->task->flags |= PF_NO_SETAFFINITY;
        wake_up_process(rescuer->task);
    }

    if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
        goto err_destroy;
    /*
     * wq_pool_mutex protects global freeze state and workqueues list.
     * Grab it, adjust max_active and add the new @wq to workqueues
     * list.
     */
    mutex_lock(&wq_pool_mutex);
    mutex_lock(&wq->mutex);
    for_each_pwq(pwq, wq)
        pwq_adjust_max_active(pwq);
    mutex_unlock(&wq->mutex);
    list_add(&wq->list, &workqueues);
    mutex_unlock(&wq_pool_mutex);
    return wq;
err_free_wq:
    free_workqueue_attrs(wq->unbound_attrs);
    kfree(wq);
    return NULL;
err_destroy:
    destroy_workqueue(wq);
    return NULL;
}

 

 該函數主要任務就是經過kzalloc分配一個workqueue_struct結構,而後格式化一個名稱,對workqueue進行簡單初始化;’接着就調用 和pwd創建關係。咱們暫且不考慮WQ_MEM_RECLAIM的狀況,那麼該函數主要就完成這兩個功能。全部的workqueue會連接成一個鏈表,鏈表頭是 一個全局靜態變量app

static LIST_HEAD(workqueues);        /* PL: list of all workqueues */

 

本函數比較重要的就是和pwq創建關係了函數

static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
    bool highpri = wq->flags & WQ_HIGHPRI;
    int cpu;
    /*若是是普通的work_queue*/
    if (!(wq->flags & WQ_UNBOUND)) {
        /*爲每一個CPU 分配pool_workqueue--pwq*/
        wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
        if (!wq->cpu_pwqs)
            return -ENOMEM;
        /*把pwd和wq連接*/
        for_each_possible_cpu(cpu) {
            struct pool_workqueue *pwq =
                per_cpu_ptr(wq->cpu_pwqs, cpu);
            struct worker_pool *cpu_pools =
                per_cpu(cpu_worker_pools, cpu);

            init_pwq(pwq, wq, &cpu_pools[highpri]);

            mutex_lock(&wq->mutex);
            link_pwq(pwq);
            mutex_unlock(&wq->mutex);
        }
        return 0;
    } else {
        return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
    }
}

 

這裏先知考慮普通的workqueue,不考慮WQ_UNBOUND。函數經過alloc_percpu爲workqueue分配了pool_workqueue變量,而後經過for_each_possible_cpu,對每一個CPU進行處理,實際上就是把對應的pool_workqueue和worker_pool經過init_pwq關聯起來。如上一篇文章所描述的,worker_pool分爲兩種:普通的和高優先級的。普通的爲第0項,而高優先級的爲第一項。創建關聯後在經過link_pwq把pwq接入wq的鏈表中。atom

2、worker的建立spa

在建立好workqueue和對應的pwq以及worker_pool後,須要顯示的爲worker_pool建立worker。核心函數爲create_and_start_worker線程

static int create_and_start_worker(struct worker_pool *pool)
{
    struct worker *worker;

    mutex_lock(&pool->manager_mutex);
    /*建立一個屬於pool的worker,其實是建立一個線程*/
    worker = create_worker(pool);
    if (worker) {
        spin_lock_irq(&pool->lock);
        /*啓動worker,即喚醒線程*/
        start_worker(worker);
        spin_unlock_irq(&pool->lock);
    }

    mutex_unlock(&pool->manager_mutex);
    return worker ? 0 : -ENOMEM;
}

 

注意這裏是針對worker_pool建立worker,因此worker_pool做爲參數傳遞進來,而具體執行建立任務的是create_worker函數,且因爲有專門的worker manager,故這裏給worker_pool增長worker須要加鎖。rest

create_worker函數其實也不復雜,核心任務主要包含如下幾個步驟:code

  • 經過alloc_worker分配一個worker結構,並執行簡單的初始化
  • 在worker和worker_pool之間創建聯繫
  • 經過kthread_create_on_node建立工做線程,處理函數爲worker_thread
  • 設置線程優先級

初始狀態下是爲每一個worker_pool建立一個worker。建立好以後經過start_worker啓動workerorm

static void start_worker(struct worker *worker)
{
    worker->flags |= WORKER_STARTED;
    worker->pool->nr_workers++;
    worker_enter_idle(worker);
    wake_up_process(worker->task);
}

 該函數較簡單,首先就更新worker狀態爲WORKER_STARTED,增長pool中worker統計量;而後經過worker_enter_idle標記worker目前處於idle狀態;最後經過wake_up_process喚醒worker。咱們看下中間設置idle狀態的過程

static void worker_enter_idle(struct worker *worker)
{
    struct worker_pool *pool = worker->pool;

    if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
        WARN_ON_ONCE(!list_empty(&worker->entry) &&
             (worker->hentry.next || worker->hentry.pprev)))
        return;

    /* can't use worker_set_flags(), also called from start_worker() */
    worker->flags |= WORKER_IDLE;
    pool->nr_idle++;
    worker->last_active = jiffies;

    /* idle_list is LIFO */
    list_add(&worker->entry, &pool->idle_list);

    if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
        mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT);

    /*
     * Sanity check nr_running.  Because wq_unbind_fn() releases
     * pool->lock between setting %WORKER_UNBOUND and zapping
     * nr_running, the warning may trigger spuriously.  Check iff
     * unbind is not in progress.
     */
    WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
             pool->nr_workers == pool->nr_idle &&
             atomic_read(&pool->nr_running));
}

 該函數會設置WORKER_IDLE,遞增pool的nr_idle計數,而後更新last_active爲當前jiffies。接着把worker掛入pool的idle_list的鏈表頭.默認狀態下,一個worker在idle狀態停留的最長時IDLE_WORKER_TIMEOUT,超過這個時間就要啓用管理工做。這裏的last_active即是紀錄進入idle狀態的時間,

3、worker的管理

系統中會根據實際對worker的須要,動態的增刪worker。針對idle worker,worker_pool中有個定時器idle_timer,其處理函數爲idle_worker_timeout,看下該處理函數

static void idle_worker_timeout(unsigned long __pool)
{
    struct worker_pool *pool = (void *)__pool;

    spin_lock_irq(&pool->lock);

    if (too_many_workers(pool)) {
        struct worker *worker;
        unsigned long expires;

        /* idle_list is kept in LIFO order, check the last one ,即最早掛入鏈表的*/
        worker = list_entry(pool->idle_list.prev, struct worker, entry);
        expires = worker->last_active + IDLE_WORKER_TIMEOUT;
        /*idleworker每次最多保持idle狀態IDLE_WORKER_TIMEOU,當定時器到期時進行檢查,若是還未到最長時間,則延遲定時器,不然
        *對pool設置管理標誌,喚醒線程進行管理
        */
        if (time_before(jiffies, expires))
            mod_timer(&pool->idle_timer, expires);//重置到期時間
        else {
            /* it's been idle for too long, wake up manager */
            pool->flags |= POOL_MANAGE_WORKERS;
            wake_up_worker(pool);
        }
    }

    spin_unlock_irq(&pool->lock);
}

 

該函數主要是針對系統中出現太多worker的狀況進行處理,如何斷定worker太多呢?too_many_workers去完成,具體就是 nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy決定,其中MAX_IDLE_WORKERS_RATIO爲4。當的確idle worker太多了的時候,取最早掛入idle鏈表中的worker,斷定其處於idle狀態的時間是否超時,即超過IDLE_WORKER_TIMEOUT,若是沒有超時,則經過mod_timer修改定時器到期時間爲該定時器對應的最長idle時間,不然設置pool的POOL_MANAGE_WORKERS狀態,喚醒pool中的first worker執行管理工做。在worker的處理函數worker_thread中,經過need_more_worker判斷當前是否須要更多的worker,若是不須要,則直接goto到sleep

sleep:
    if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker))
        goto recheck;

 

need_to_manage_workers就是判斷POOL_MANAGE_WORKERS,若是設置了該標誌則返回真。 管理worker的核心在manage_workers,其中只有兩個關鍵函數

    ret |= maybe_destroy_workers(pool);
    ret |= maybe_create_worker(pool);

 

咱們只看maybe_destroy_workers

static bool maybe_destroy_workers(struct worker_pool *pool)
{
    bool ret = false;

    while (too_many_workers(pool)) {
        struct worker *worker;
        unsigned long expires;

        worker = list_entry(pool->idle_list.prev, struct worker, entry);
        expires = worker->last_active + IDLE_WORKER_TIMEOUT;

        if (time_before(jiffies, expires)) {
            mod_timer(&pool->idle_timer, expires);
            break;
        }
        /*刪除最早掛入鏈表的worker*/
        destroy_worker(worker);
        ret = true;
    }
    return ret;
}

 

該函數中會在此經過too_many_workers判斷是否有太多worker,若是是,則再次取最後一個worker,檢查idle時間,若是沒有超時,則修改定時器到期時間,不然經過destroy_worker銷燬worker。爲何要這樣判斷呢?經過對代碼的分析,我感受manage_work不只負責刪除,還負責增長worker,定時器主要是針對idle worker即目的是銷燬多餘的worker,可是執行管理任務的工做集成到了worker_thread中,所以就worker_thread而言,有可能須要增長、有可能須要刪除、還有可能不須要管理。所以這裏須要再次判斷。

4、work的添加

static inline bool schedule_work(struct work_struct *work)
{
    return queue_work(system_wq, work);
}

static inline bool queue_work(struct workqueue_struct *wq,
                  struct work_struct *work)
{
    return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}
bool queue_work_on(int cpu, struct workqueue_struct *wq,
           struct work_struct *work)
{
    bool ret = false;
    unsigned long flags;
    local_irq_save(flags);
    if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
        __queue_work(cpu, wq, work);
        ret = true;
    }
    local_irq_restore(flags);
    return ret;
}

 所以主體就是__queue_work,其中一個核心工做就是調用了insert_work

static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
            struct list_head *head, unsigned int extra_flags)
{
    struct worker_pool *pool = pwq->pool;

    /* we own @work, set data and link */
    set_work_pwq(work, pwq, extra_flags);
    list_add_tail(&work->entry, head);
    get_pwq(pwq);

    /*
     * Ensure either wq_worker_sleeping() sees the above
     * list_add_tail() or we see zero nr_running to avoid workers lying
     * around lazily while there are works to be processed.
     */
    smp_mb();
    /*若是須要更多,則喚醒,主要是判斷當前是否有正在運行的worker*/
    if (__need_more_worker(pool))
        wake_up_worker(pool);
}

 

函數首先調用set_work_pwq把pwd寫入到work的data字段,而後把work加入到worker_pool維護的work鏈表中,在最後判斷如今是否須要更多worker,若是須要,則執行喚醒操做。固然是針對當前worker_pool,且喚醒的是worker_pool的第一個worker。其實在queue_work中,爲避免work重入,在選定worker_pool的時候會判斷該work是否仍在其餘worker_pool上運行,若是是,就把該work掛入對應worker_pool的work_list上,

以馬內利

參考資料:

LInux 3.10.1源碼

相關文章
相關標籤/搜索