Linux內核設計與實現 總結筆記(第八章)下半部和推後執行的工做

上半部分的中斷處理有一些侷限,包括:linux

  • 中斷處理程序以異步方式執行,而且它有可能打斷其餘重要代碼的執行。
  • 中斷會屏蔽其餘程序,因此中斷處理程序執行的越快越好。
  • 因爲中斷處理程序每每須要對硬件進行操做,因此它們一般又很高的時限要求。
  • 中斷處理程序不在進程上下文中運行,因此它們不能阻塞。這限制它們所作的事情。

 

1、下半部

下半部的任務就是執行與中斷處理密切相關但中斷處理程序自己不執行的工做。數組

中斷處理程序會異步執行而且在最好的狀況下也會鎖定當前的中斷線。安全

  • 若是一個任務對時間很是敏感,將其放在中斷處理程序中執行
  • 若是一個任務和硬件相關,將其放在中斷處理程序中執行
  • 若是一個任務要保證不被其餘中斷打斷,將其放在中斷處理程序中執行。
  • 其餘全部任務,考慮放置在下半部執行

能夠讀一下別人的代碼,看看別人怎麼寫的網絡

1.1 爲何要用下半部

能夠儘可能縮短中斷處理的時間,提升系統的響應能力。數據結構

1.2 下半部的環境

在linux中經歷過多種下半部機制。最先Linux提供下半部的惟一方法叫作「BH」。然而這個機制方便卻不靈活,簡單又有性能瓶頸。異步

而後開發者們引入了任務隊列,來實現工做推後執行。而且代替了「BH」機制。不過仍是不夠靈活,尤爲是網絡部分。ide

在2.3中,開發者引入了軟中斷(softirqs)和tasklet。32個軟中斷在全部處理器上同時執行,而相同類型的tasklet不能同時執行。函數

  • BH(在2.5中去除)
  • 任務隊列(2.5中去除)
  • 軟中斷(在2.3中開始引入)
  • tasklet(2.3中引入)
  • 工做隊列(2.5開始引入)

2、軟中斷

軟中斷使用的較少,而tasklet是下半部更經常使用的形式。不過tasklet是經過軟中斷實現的。性能

2.1 軟中斷的實現

在<linux/interrunpt.h>中,有softirq_action結構this

struct softirq_action {
    void (*action)(struct softirq_action *);
};
softirq_action

kernel/softirq.c中定義了一個包含有32個結構體的數組

static struct softirq_action softirq_vec[NR_SOFTIRQS]
softirq_vec

如今只用到了9個,在<linux/interrupt.h>中有定義

enum
{
    HI_SOFTIRQ=0,
    TIMER_SOFTIRQ,
    NET_TX_SOFTIRQ,
    NET_RX_SOFTIRQ,
    BLOCK_SOFTIRQ,
    BLOCK_IOPOLL_SOFTIRQ,
    TASKLET_SOFTIRQ,
    SCHED_SOFTIRQ,
    HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on the
                numbering. Sigh! */
    RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */

    NR_SOFTIRQS
};
NR_SOFTIRQS

上面這些能夠相似的使用這種方式來實現,softirq_handler是咱們本身定義的名字:

void softirq_handler(struct softirq_action *)

而後有,把整個結構體都傳遞給軟中斷處理程序,而不是僅僅傳遞數據值。若是結構體加入新的域時,無需對全部的軟中斷處理程序都進行變更。

一個註冊的軟中斷必須在被標記後纔會執行,被稱做觸發軟中斷。

如下幾種狀況,待處理的軟中斷會被檢查和執行。

  • 從一個硬件中斷代碼處返回時
  • 在ksoftirqd內核線程中
  • 在哪些顯式檢查和執行待處理的軟中斷的代碼中,如網絡子系統中

無論用什麼辦法喚醒,軟中斷都要在do_softirq()中執行。 

/* 若是有待處理的軟中斷,do_softirq()會循環遍歷每個,調用它們的處理程序 */
do_softirq()
{
    u32 pending;

    pending = local_softirq_pending();
    if(pending) {
        struct softirq_action *h; 

        /* 重設待處理的位圖 */
        set_softirq_pending(0);

        h = softirq_vec;
        do {
            if(pending & 1)
                h->action(h);
            h++;
            pending >>= 1;
        } while(pending);
    }   
}

/* 1) 用局部變量pending保存local_softirq_pending()宏的返回值,若是第n位被置1。那麼第n位中斷待處理
 * 2) 如今待處理的軟中斷位圖已經被保存,能夠將實際軟中斷位圖清零了
 * 3) 將指針h只想softirq_vec的第一項
 * 4) 若是pending的第一位被置1,則h->action(h)被調用
 * 5) 指針加1,h指向第二項
 * 6) pending右移1位,依次循環
 * 7) 重複到最後執行結束
 */
do_softirq

2.2 使用軟中斷

目前只有網絡和SCSI直接使用軟中斷。此外內核定時器和tasklet都是創建在軟中斷上的。對於時間要求嚴格並能本身搞笑完成枷鎖工做的應用,軟中斷會很好。

1.分配索引

在編譯期間,經過在<linux/interrupt.h>中定義的一個美劇類型來靜態聲明軟中斷。

內核使用從0開始的索引表示優先級,索引號越小優先級越高。

 

2.註冊處理程序

運行時經過調用open_softirq()註冊軟中斷處理程序,兩個參數:軟中斷的索引號和處理函數。在網絡子系統中有net/coreldev.c中。這樣使用

open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
open_softirq

軟中斷處理程序執行時,容許響應中斷,但本身不能休眠。好比當前處理器軟中斷被禁止,其餘處理器仍然能夠執行別的軟中斷。這意味着數據共享,因此要嚴格的鎖機制。

軟中斷的意義是可擴展性,能夠在多個處理器中高效運行。若是不須要擴展多處理器,那麼使用tasklet吧。

3.觸發軟中斷

經過在枚舉類型的列表中添加新項以及調用open_softirq()進行註冊之後,新的軟中斷處理程序可以運行。

raise_softirq()函數能夠將一個軟中斷設置爲掛起狀態。在下次調用do_softirq()函數時投入運行。

3、tasklet

tasklet是利用軟中斷實現的一種下半部機制。它的接口更簡單,鎖保護也要求較低。

3.1 tasklet的實現

tasklet由兩類軟中斷表明,HI_SOFTIRQ和TASKLET_SOFTIRQ,這兩個惟一的區別在於HI_SOFTIRQ先執行。

3.2 tasklet結構體

struct tasklet_struct
{
    struct tasklet_struct *next;        /* 鏈表中的下一個tasklet */
    unsigned long state;                /* tasklet的狀態 */
    atomic_t count;                     /* 引用計數器 */
    void (*func)(unsigned long);        /* tasklet處理函數 */
    unsigned long data;                 /* 給tasklet處理函數的參數 */
};
struct tasklet_struct

func是tasklet的處理程序,data是它惟一的參數。

state只能在0、TASKLET_STATE_SCHED(被調度)和TASKLET_STATE_RUN(在運行)之間取值。

count是tasklet應用計數器,若是不是0,tasklet被禁止。只有0時才被激活。

3.3 調度tasklet 

已經調度的tasklet存放在兩個單處理器數據結構中:tasklet_vec(普通tasklet)和tasklet_hi_vec(高優先級的tasklet)。

tasklet由tasklet_schedule()和tasklet_hi_schedule(),接收一個只需將tasklet_struct結構的指針做爲參數。

tasklet_schedule()執行步驟:

  • 1)檢查tasklet的狀態是否爲TASKLET_STATE_SCHED
  • 2)調用_tasklet_schedule()
  • 3)保存中斷狀態,而後禁止本地中斷。
  • 4)把須要的tasklet加到每一個處理器一個的tasklet_vec鏈表或tasklet_hi_vec鏈表的表頭上去
  • 5)喚起TASKLET_SOFTIRQ或HI_SOFTIRQ軟中斷,下次調用do_softirq(0就會執行tasklet
  • 6)恢復中斷到原狀態
static inline void tasklet_schedule(struct tasklet_struct *t) 
{
    if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
        __tasklet_schedule(t);
}

void __tasklet_schedule(struct tasklet_struct *t)
{
    unsigned long flags;

    local_irq_save(flags);
    t->next = NULL;
    *__this_cpu_read(tasklet_vec.tail) = t;
    __this_cpu_write(tasklet_vec.tail, &(t->next));
    raise_softirq_irqoff(TASKLET_SOFTIRQ);
    local_irq_restore(flags);
}
tasklet_schedule

tasklet_action和tasklet_hi_action()就是tasklet_vec的處理核心:

static void tasklet_action(struct softirq_action *a)
{
    struct tasklet_struct *list;

    local_irq_disable();
    list = __this_cpu_read(tasklet_vec.head);
    __this_cpu_write(tasklet_vec.head, NULL);
    __this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
    local_irq_enable();

    while (list) {
        struct tasklet_struct *t = list;

        list = list->next;

        if (tasklet_trylock(t)) {
            if (!atomic_read(&t->count)) {
                if (!test_and_clear_bit(TASKLET_STATE_SCHED,
                            &t->state))
                    BUG();
                t->func(t->data);
                tasklet_unlock(t);
                continue;
            }
            tasklet_unlock(t);
        }

        local_irq_disable();
        t->next = NULL;
        *__this_cpu_read(tasklet_vec.tail) = t;
        __this_cpu_write(tasklet_vec.tail, &(t->next));
        __raise_softirq_irqoff(TASKLET_SOFTIRQ);
        local_irq_enable();
    }
}
tasklet_action
  • 1)禁止中斷,併爲當前處理器檢索tasklet_vec或tasklet_hig_vec鏈表
  • 2)將當前處理器上的該鏈表設置爲NULL,達到清空的效果
  • 3)容許響應中斷,不須要回到原狀態
  • 4)循環遍歷得到鏈表上的每個待處理的tasklet
  • 5)若是是多處理器系統,檢查TASKLET_STATE_RUN。若是正在執行,那如今就不運行
  • 6)若是當前這個tasklet沒有執行,將其狀態設置爲TASKLET_STATE_RUN
  • 7)檢查count值是否爲0,確保tasklet沒有被禁止。若是tasklet被禁止了,則跳到下一個掛起的tasklet
  • 8)已經知道了沒有在其餘地方執行,而且被設置成運行狀態,這樣其餘部分就不會被執行,而且引用計數爲0,如今能夠執行tasklet的處理程序了。
  • 9)tasklet運行完畢,清除tasklet的state域的TASKLET_STATE_RUN標誌狀態
  • 10)重複執行下一個tasklet直至沒有剩餘的等待處理的tasklet

 3.4 使用tasklet

1.聲明本身的tasklet

若是是靜態建立,在<linux/interrupt.h>中定義了兩個宏:

DECLARE_TASKLET(name, func, data)
DECLARE_TASKLET_DISABLED(name, func, data);
根據給定的名稱靜態建立一個tasklet_struct結構
tasklet被調度後,會執行func函數,參數由data提供
這兩個區別在於初始值不一樣,致使初始狀態一個運行態,一個等待
tasklet靜態建立

 使用例子:

DECLARE_TASKLET(my_tasklet, my_tasklet_handler, dev);
等價於
struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),
    my_tasklet_handler, dev};
建立了一個my_stasklet,處理程序是my_tasklet_handler,dev是值
例子

2.編寫本身的tasklet處理程序

tasklet必須符合特定格式

void tasklet_handler(unsigned long data)

須要注意tasklet不能睡眠,不能使用信號量或者其餘什麼阻塞函數

兩個相同tasklet不會同時運行,共享數據必須適當的鎖保護

3.調度你本身的tasklet

經過調度tasklet_schedule()函數並傳遞給他相應的tasklet_struct指針

tasklet_schedule(&my_tasklet);    /* 把my_tasklet標記爲掛起 */

 

4.ksoftirqd

軟中斷可能頻繁觸發致使用戶空間進程沒法得到足夠的處理器時間,於是處飢餓狀態。有兩種最容易想到的方案:

1. 一種是軟中斷返回前,把其餘的軟中斷都一併處理。複雜狀況下,可能一直運行

2.第二種是在返回後,軟中斷必須等上一些時間才能運行。這樣空閒時速度反而不夠。

最後開發者作了一些折中,內核不會當即處理從新觸發的軟中斷。當大量軟中斷出現的時候,內核會喚醒一組內核線程處理這些負載。

這些線程在最低優先級上運行(nice19),以免對重要任務資源搶奪。

每一個處理器都有這樣一個線程,全部線程的名字都叫作ksoftirqd/n,區別在於n,它對應處理器的編號。

只要有空閒的處理器,ksoftirq就會調用do_softirq()去處理它們。

4、工做隊列

工做隊列是另一種將工做推後執行的形式,交由一個內核線程去執行。這個下半部總會在進程上下文中執行。

區分使用tasklet、軟中斷和工做隊列是,任務是否須要睡眠。

4.1 工做隊列的實現

工做隊列子系統是一個用於建立內核線程的接口,經過它建立的進程負責執行由內核其餘部分排到隊列裏的任務。

建立這些內惡化線程稱做工做者線程。工做隊列可讓你的驅動程序建立一個專門的工做者線程來處理須要推後的工做。

它定義在kernel/workqueue.c中,數組中的每一項對應系統中的一個處理器。每一個處理器,每一個工做者線程對應一個這樣的cpu_workqueue_struct結構體

/*
 * 外部可見的工做隊列抽象是
 * 由每一個CPU的工做隊列組成的數組
 */
struct workqueue_struct {
    struct cpu_workqueue_struct cpu_wq[NR_CPUS];
    struct list_head list;
    const char *name;
    int sinqlethread;
    int freezeable;
    int rt; 
};
workqueue_struct

cpu_workqueue_struct是kernel/workqueue.c的核心數據結構:

struct cpu_workqueue_struct {
    spinlock_t lock;            /* 鎖保護這種結構 */
    struct list_head worklist;  /* 工做列表       */
    wait_queue_head_t more_work;
    struct work_struct *current_struct;
    struct workqueue_struct *wq;    /* 關聯工做隊列結構 */
    task_t *thread;                 /* 關聯線程         */
};
cpu_workqueue_struct

4.1.2 表示工做的數據結構

全部的工做者線程是用普通的內核線程實現的,它們都要執行worker_thread()函數。在<linux/workqueue.h>中定義的work_struct結構體表示:

struct work_struct {
    atomic_long_t data;
    struct list_head entry;
    work_func_t func;
};
struct work_struct

這些結構體被鏈接成鏈表,在每一個處理器上的每種類型的隊列都對應這樣一個鏈表。

work_thread()函數的核心流程,以下:

for ( ; ; ) {
    prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
    if ( list_empty(&cwq->worklist))
        schedule();
    finish_wait(&cwq->more_work, &wait);
    run_workqueue(cwq);
}
work_thread

1)線程將本身設置爲休眠狀態(state被設成TASK_INTERRUPTIBLE),並把本身加入到等待隊列中

2)若是工做鏈表是空的,線程調用schedule()函數進入睡眠狀態

3)若是鏈表中有對象,線程不會睡眠。相反,它將本身設置成TASK_RUNNING,脫離等待隊列。

4)若是鏈表非空,調用run_workqueue()函數執行被推後的工做。

while(!list_empty(&cwq->worklist)) {
    struct work_struct *work;
    work_func_t f;
    void *data;
    
    work = list_entry(cwq->worklist.next, struct work_struct, entry);
    f = work->func;
    list_del_init(cwq->worklist.next);
    work_clear_pending(work);
    f(work);
}
run_workqueue()函數完成推後到此的工做

1)當鏈表不爲空時,選取下一個節點對象

2)獲取咱們下網執行的函數func及其參數data

3)把該節點從鏈表上解下來,將待處理標誌位pending清零

4)調用函數

5)重複執行

4.1.3 工做隊列實現機制的總結

位於最高一層的工做者線程。系統容許有多種類型的工做者線程存在,對於指定的一個類型,系統的每一個CPU上都有要給該類的工做者線程。

內核能夠根據須要來建立工做者線程,而在默認狀況下內核只有event這一種類型的工做者線程。

4.1.4 使用工做隊列

①建立推後的工做

能夠經過DECLARE_WORK在編譯時靜態地建立該結構體:

DECLARE_WORK(name, void(*func) (void *), void *data);

靜態的建立要給名爲name,處理函數爲func,參數爲data的work_struct結構體。

也能夠在運行時經過指針建立一個工做:

INIT_WORK(struct work_struct *work, void(*func)(void *), void *data);

初始化一個由work指向的工做,處理函數爲func,參數爲data。

②工做隊列處理函數

void work_handler(void *data)

會由一個工做者線程執行,函數會進行在進程上下文中。默認狀況下,容許中斷響應,而且沒有任何鎖。函數還能夠睡眠。

須要注意的是函數不能訪問用戶空間,

③對工做進行調度

schedule_work(&work);

work立刻就會被調度,一旦其所在的處理器上的工做者線程被喚醒,它就會被執行。

schedule_delayed_work(&work, delay);

&work指向work_struct直到delay指定的時鐘節拍用完之後纔會執行。

④刷新操做

void flush_scheduled_work(void);

函數會一直等待,直到隊列中多有對象都被執行之後才返回。

int cancel_delayed_work(struct work_struct *work);

這個函數能夠取消任何與work_struct相關的掛起工做

⑤建立新的工做隊列

struct workqueue_struct *create_workqueue(const char *name);

建立一個新的任務隊列和與之相關的工做者線程。name參數用於該內核線程的命名,好比缺省的events隊列的建立就是調用:

struct workqueue_struct *keventd_wq;

keventd_wq = create_workqueue("events");

建立一個工做的時候無需考慮工做隊列的類型。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)

int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)

最後你能夠調用下面的函數刷新指定的工做隊列:

flush_workqueue(struct workqueue_struct *wq);

#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/workqueue.h>
#include <linux/slab.h>

static struct workqueue_struct *my_wq;

typedef struct {
    struct work_struct work;
    int x;
}my_work_t;

my_work_t *work, *work2;
static void my_wq_func(struct work_struct *work)
{
    my_work_t *my_work = (my_work_t *)work;
    printk("my_work.x %d\n", my_work->x);
}

int init_module(void)
{
    int ret;

    my_wq = create_workqueue("my_queue");    //建立一個工做隊列
    if(my_wq) {
        work = (my_work_t *)kmalloc(sizeof(my_work_t), GFP_KERNEL);
        if(work) {
            INIT_WORK((struct work_struct *)work, my_wq_func);
            work->x = 1;
            ret = queue_work(my_wq, (struct work_struct *)work);
        }

        work2 = (my_work_t *)kmalloc(sizeof(my_work_t), GFP_KERNEL);
        if(work2) {
            INIT_WORK((struct work_struct *)work2, my_wq_func);
            work2->x = 2;
            ret = queue_work(my_wq, (struct work_struct *)work2);
        }
    }

    return 0;
}

void cleanup_module(void)
{
    flush_workqueue(my_wq);
    destroy_workqueue(my_wq);
}

MODULE_AUTHOR("skldjkas");
MODULE_DESCRIPTION(" work queue of buttom half test");
MODULE_LICENSE("GPL v2");
work.c

 

5、下半部機制的選擇

軟中斷提供的執行序列化保障最少,要求軟中斷處理函數必須格外當心地採起一些步驟確保共享數據的安全。

若是代碼多線索化考慮得不充分,那麼tasklet意義更大。 接口簡單,並且兩種同類型的tasklet不能同時執行

若是你須要吧任務推後到進程上下文中完成,那麼只能選擇工做隊列了。

 

6、在下半部之間加鎖

 

7、禁止下半部

相關文章
相關標籤/搜索