Kernel常見鎖的原理和實現

    鎖是內核中使用最頻繁,最基礎的設施之一,在內核的各個模塊中被大量使用。鎖的本質是在併發過程當中保證資源的互斥使用。Linux內核提供了多種鎖,應用的場合也各不相同,主要包括:原子操做,信號量,讀寫鎖,自旋鎖,以及RCU鎖機制等。數據結構

    RCU是比讀寫鎖更高效,而且同時支持多個讀者和多個寫者併發的機制,其實現很是複雜,涉及到軟中斷,completion機制等,將再也不本篇分析,另起一篇RCU機制和實現。架構

原子操做(atomic)併發

原子操做是實現其餘各類鎖的基礎。考慮以下代碼語句:函數

i++;優化

編譯器在編譯的過程當中,該語句有可能被編譯成以下三條CPU指令:atom

  1. 加載內存變量i的值到寄存器線程

  2. 寄存器值+1設計

  3. 將寄存器值寫回內存rest

咱們知道,一條指令在單CPU上對內存的訪問是原子的(由於中斷只能是在當前指令執行完以後纔去檢查處理),但多條指令之間並非原子的,單條指令在多處理器系統上也不是原子的。所以,上面i++語句雖然在代碼編寫上是一條語句,但在二進制可執行指令上倒是三條指令,若是在兩個併發的例程中對同一個i變量同時執行i++操做,必然會致使數據錯亂。所以,每種CPU架構都應該提供一套指令,用於鎖定/解鎖內存總線,使得在鎖定區內執行的指令是一個原子操做。對象

以x86架構爲例,提供了lock;前綴指令,用於在指令執行前先鎖定特定內存,保證對特定內存的互斥訪問。以atomic_add()實現爲例:

#ifdef CONFIG_SMP

#define LOCK_PREFIX_HERE \

".pushsection .smp_locks,\"a\"\n" \

".balign 4\n" \

".long 671f - .\n" /* offset */ \

".popsection\n" \

"671:"

 

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

 

#else /* ! CONFIG_SMP */

#define LOCK_PREFIX_HERE ""

#define LOCK_PREFIX ""

#endif

 

static __always_inline void atomic_add(int i, atomic_t *v)

{

asm volatile(LOCK_PREFIX "addl %1,%0"

: "+m" (v->counter)

: "ir" (i));

}

atomic_add()使用內聯彙編的方式實現整型變量的加法,核心是:addl %1,%0彙編指令。若是是在單CPU系統架構上調用該函數,那麼這條指令是原子的。可是若是在SMP(對稱多處理器)系統上,該單條指令就不是原子的。所以,該實現還增長了LOCK_PREFIX宏,用於區分是不是SMP系統:若是是SMP系統,LOCK_PREFIX宏定義中包含lock;指令前綴,不然LOCK_PREFIX爲空(單CPU系統上單條指令自己就是原子的)。

 

信號量(semaphore/mutex)

信號量semaphore是一種睡眠鎖,實現對多個同類資源的互斥訪問,若是資源個數降爲1個,就是互斥鎖mutex。信號量實現原理以下:初始有n個同類資源,當某個線程獲取(down操做)資源時,資源個數-1。當全部資源被分配完,此時當前線程被掛起在等待隊列上,直到某個線程釋放了(up操做)資源後,喚醒在等待隊列上的線程從新獲取資源。

信號量的數據結構簡單清晰:

struct semaphore {

raw_spinlock_t lock; // 自旋鎖,保護count和 wait_list 的互斥訪問

unsigned int count; // 資源個數,在sema_init接口中初始化

struct list_head wait_list; // 等待隊列

};

基本接口定義也清晰明瞭:

void sema_init(struct semaphore *sem, int val);

void down(struct semaphore *sem);

void up(struct semaphore *sem);

簡單分析一下實現:

void down(struct semaphore *sem)

{

unsigned long flags;

 

raw_spin_lock_irqsave(&sem->lock, flags);

if (likely(sem->count > 0))

sem->count--; // 上鎖成功

else

__down(sem); // 資源已經用完,掛起當前線程

raw_spin_unlock_irqrestore(&sem->lock, flags);

}

 

void up(struct semaphore *sem)

{

unsigned long flags;

 

raw_spin_lock_irqsave(&sem->lock, flags);

if (likely(list_empty(&sem->wait_list)))

sem->count++; // 等待隊列爲空,count++後直接退出

else

__up(sem); // 喚醒等待隊列線程

raw_spin_unlock_irqrestore(&sem->lock, flags);

}

 

讀寫信號量(rw_sem)

    讀寫信號量是信號量的更細粒度實現。咱們知道,對於單個資源,不管讀寫,資源都是互斥的。也就是說,同時只能有一個讀線程或者寫線程獨佔資源,這種狀況不是最優的。考慮以下情形:有多個讀線程和一個寫線程訪問同一個資源。信號量實現的是全部線程的獨佔訪問,沒法實現某一時刻多個讀線程同時訪問資源。而讀寫信號量能夠實現多個讀線程的併發。對同一個資源訪問,讀寫信號量實現以下機制:

    (1)同時只能有一個寫者獨佔資源

    (2)同時能夠有多個讀者訪問資源

    (3)讀者和寫者不能同時訪問資源

    所以,讀寫信號量適用於有讀者不少,寫者不多的情形。kernel中提供兩種實現方式:一種是平臺相關的實現,使用內聯彙編實現,效率很高;另一種是通用的實現方式,與平臺無關,效率較低。咱們只關注基本的通用實現方式:

    struct rw_semaphore {
    __s32            count;                    //  引用計數
    raw_spinlock_t        wait_lock;    //  自旋鎖,保護wait_list的互斥訪問
    struct list_head    wait_list;         //  等待隊列
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
};

void __sched down_read(struct rw_semaphore *sem);

void __sched down_write(struct rw_semaphore *sem);

void up_read(struct rw_semaphore *sem);

void up_write(struct rw_semaphore *sem);

    說明:int類型的count設計得很巧妙,count > 0表示當前佔用資源的讀線程個數,count = -1表示當前有個寫線程佔用資源,count = 0表示當前資源空閒。

    下面分析核心實現:

    (1)讀者上鎖:

void __sched __down_read(struct rw_semaphore *sem)
{
    struct rwsem_waiter waiter;
    struct task_struct *tsk;
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖保護等待隊列

    if (sem->count >= 0 && list_empty(&sem->wait_list)) {    // 資源空閒或者被其餘讀者佔用,而且等待隊列爲空,當前讀者能夠獲取鎖。增長讀者引用計數,釋放自旋鎖並退出,上鎖成功。
        /* granted */
        sem->count++; 
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        goto out;
    }

    // 要麼資源被寫者佔用(count == -1),要麼等待隊列不空,那麼當前讀者都要入隊列。判斷等待隊列不空是爲了防止讀者飢餓。

    tsk = current;
    set_task_state(tsk, TASK_UNINTERRUPTIBLE);    // 優化措施:只能被其餘讀者/寫者主動喚醒,防止被系統喚醒,增長系統開銷

    /* set up my own style of waitqueue */
    waiter.task = tsk;
    waiter.type = RWSEM_WAITING_FOR_READ;
    get_task_struct(tsk);

    list_add_tail(&waiter.list, &sem->wait_list);    // 當前讀者加入等待隊列

    /* we don't need to touch the semaphore struct anymore */
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);    // 釋放自旋鎖

    /* wait to be given the lock */
    for (;;) {
        if (!waiter.task)
            break;
        schedule();    // 切換線程,調度其餘讀者/寫者執行
        set_task_state(tsk, TASK_UNINTERRUPTIBLE);     // for 循環是爲了判斷本次喚醒是否真的輪到本身運行。存在以下情形:等待隊列上有多個讀寫線程,當寫線程完成以後,會喚醒等待隊列上全部線程。爲了防止讀寫飢餓,先入隊列的線程先獲得執行。若是當前線程前面有個寫線程獲得機會執行,那麼當前讀線程須要再次掛起在等待隊列,等待下次調度。
    }

    __set_task_state(tsk, TASK_RUNNING);    // 當前讀者能夠獲取鎖,上鎖成功
 out:
    ;
}

    (2)讀者解鎖:

void __up_read(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖上鎖

    if (--sem->count == 0 && !list_empty(&sem->wait_list))    // 資源空閒而且等待隊列不空,此時等待隊列的首個對象必定是一個寫者
        sem = __rwsem_wake_one_writer(sem);    // 喚醒等待隊列上的寫者

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
    (3)寫者上鎖:

void __sched __down_write_nested(struct rw_semaphore *sem, int subclass)
{
    struct rwsem_waiter waiter;
    struct task_struct *tsk;
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖上鎖

    /* set up my own style of waitqueue */
    tsk = current;
    waiter.task = tsk;
    waiter.type = RWSEM_WAITING_FOR_WRITE;
    list_add_tail(&waiter.list, &sem->wait_list);    // 寫者先入隊列

    /* wait for someone to release the lock */
    for (;;) {
        /*
         * That is the key to support write lock stealing: allows the
         * task already on CPU to get the lock soon rather than put
         * itself into sleep and waiting for system woke it or someone
         * else in the head of the wait list up.
         */
        if (sem->count == 0)    // 若是資源沒被佔用,跳出for循環後,退出隊列,上鎖成功。
            break;
        set_task_state(tsk, TASK_UNINTERRUPTIBLE);
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        schedule();        //  資源被佔用,調度其餘線程
        raw_spin_lock_irqsave(&sem->wait_lock, flags);
    }

    // 上鎖成功,退出等待隊列
    /* got the lock */
    sem->count = -1;
    list_del(&waiter.list);

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}

void __sched __down_write(struct rw_semaphore *sem)
{
    __down_write_nested(sem, 0);
}
    (4)寫者解鎖:

void __up_write(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖上鎖

    sem->count = 0;    // 讀者解鎖成功,資源空閒
    if (!list_empty(&sem->wait_list))    // 等待隊列不空
        sem = __rwsem_do_wake(sem, 1);    // 調度等待隊列上的讀者/寫者運行

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
 

自旋鎖(spin lock)

自旋鎖設計可用於SMP系統中中斷上下文的臨界區保護。咱們知道,中斷上下文中是不能夠睡眠的(1,中斷上下文要求及時處理,2中斷上下文不是可調度實體 等種種緣由),所以信號量/讀寫鎖等睡眠鎖不能用於中斷上下文。自旋鎖設計原理是:當鎖被其餘內核線程CPU或其餘CPU中斷上下文持有時,當前CPU不是去睡眠,而是不停的空轉並輪詢該鎖的狀態,直到該鎖被其餘CPU釋放,當前CPU獲取該鎖並進入臨界區,執行完以後釋放該鎖。這也是自旋鎖的名稱來源:當獲取不到鎖時,CPU空等,不停的自旋。

根據自旋鎖的設計原理,咱們知道:

(1)自旋鎖保護的臨界區代碼執行時間要儘量短,不然其餘CPU會一直忙等,浪費CPU資源。

(2)中斷上下文中的自旋鎖保護的臨界區必定不能睡眠,不然會死鎖。線程上下文中能夠睡眠,只要保證能被喚醒便可。

自旋鎖能夠用在內核線程中,也能夠用在中斷上下文中。若是某個自旋鎖只用於內核線程中,那麼該自旋鎖的實現只須要關閉內核搶佔便可(對應spin_lock/spin_unlock版本)。若是該自旋鎖不只用於內核線程,也會在中斷上下文使用,那麼該自旋鎖的實現不只要關閉內核搶佔,並且要禁止中斷(對應spin_lock_irqsave/spin_unlock_irqsotre等版本)。

相關文章
相關標籤/搜索