內核同步-鎖機制

    Linux系統上,多個進程能夠同時運行,以及各類中斷髮生的中斷也在同時獲得處理,這種多個上下文宏觀上同時運行的狀況稱爲併發。併發具體包括以下幾種可能:php

    1) UP平臺上,一個進程正在執行時被另外一個進程搶佔;linux

    2) UP平臺上,一個進程正在執行時發生了中斷,內核轉而執行中斷處理程序;編程

    3) SMP平臺上,每一個處理器都會發生UP平臺上的狀況;併發

    4) SMP平臺上,多個進程或中斷同時在多個CPU上執行;函數

多個併發的上下文同時使用同一個資源的狀況稱爲競態,而可能發生競態的這一段代碼稱爲臨界區。內核編程時的臨界區,比較多的狀況是:性能

1) 代碼訪問了全局變量,而且這段代碼可被多個進程執行;測試

2) 代碼訪問了全局變量,而且這段代碼可被進程執行,也可被中斷處理程序執行;ui

針對上述狀況,內核提供了以下手段來解決竟態問題:this

1)鎖機制:
atom

2)院子操做:

下面會先介紹鎖機制。Linux內核提供了多種鎖機制,這些鎖機制的區別在於,當獲取不到鎖時,執行程序是否發生睡眠並進行系統調度。具體包括自旋鎖、互斥體、信號量。

1、自旋鎖:spinlock_t

自旋鎖有兩個基本操做:獲取與釋放。獲取自旋鎖時,當判斷鎖的狀態爲未鎖,則會立刻加鎖,若是已是鎖定的狀態,當期執行流程則會執行「忙等待」,中間沒有任何的調度操做。也就說執行流程從判斷鎖的狀態到完成加鎖,是一個原子操做,在執行上是不可分割的。

自旋鎖的實現是平臺相關的,在使用的時候,只需統一包含以下頭文件:

#include <linux/spinlock.h>

自旋鎖的變量類型是spinlock_t,定義以下:

typedef struct {
    raw_spinlock_t raw_lock;
    #if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)
    unsigned int break_lock;
    #endif
} spinlock_t;
 
typedef struct {
    volatile unsigned int lock;
} raw_spinlock_t;

1)自旋鎖須要初始化才能使用,自旋鎖的初始化接口以下:

# define spin_lock_init(lock)     \
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)

2)獲取自旋鎖的接口以下:

void spin_lock(spinlock_t *lock);

3)釋放自旋鎖的接口以下:

void spin_unlock(spinlock_t *lock);


    獲取自旋鎖的時候,內部會首先禁止搶佔,而後來時循環判斷鎖的狀態。在UP版本中,惟一的操做就是禁止搶佔,若是是UP版本且非搶佔式內核,則進一步退化爲無操做。能夠粗略的看一下內核源碼的實現,由於自旋鎖的實現是與平臺相關的,這裏以arm平臺爲例:

void __lockfunc _spin_lock(spinlock_t *lock)
{
    preempt_disable(); //關閉搶佔
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); //這個若是沒有打開調試自旋鎖的宏的話,至關於無操做。
    _raw_spin_lock(lock); //這裏調用平臺相關的代碼來輪詢鎖的狀態
}

static inline void __raw_spin_lock(raw_spinlock_t *lock)

    {

        unsigned long tmp;

        __asm__ __volatile__(

        "1: ldrex   %0, [%1]\n"  //取lock->lock放在 tmp裏,而且設置&lock->lock這個內存地址爲獨佔訪問

        "   teq %0, #0\n"   // 測試lock->lock是否爲0,影響標誌位z

        #ifdef CONFIG_CPU_32v6K

            "   wfene\n"

        #endif

        "   strexeq %0, %2, [%1]\n"  //若是lock->lock是0,而且是獨佔訪問這個內存,就向lock->lock裏 寫入1,並向tmp返回0,同時清除獨佔標記

        "   teqeq   %0, #0\n" //如 果lock->lock是0,而且strexeq返回了0,表示加鎖成功,返回

        "   bne 1b"   //若是加鎖失敗,則會向後跳轉到上面的標號1處,再次從新執行

         : "=&r" (tmp)

        : "r" (&lock->lock), "r" (1) 

        : "cc");

        smp_mb();

    }

上面那段彙編代碼就是循環判斷lock的值,最後的那個bne 1b,就能夠明白爲何自旋鎖在獲取不到鎖的狀況下,會進行所謂的「自旋」了!

4) 此外,內核還提供了一個用於嘗試獲取自旋鎖的接口:

Int spin_trylock(spinlock_t *lock);

到這裏爲止,上面介紹的接口都沒有考慮到在獲取鎖之後又發生中斷的問題,若是要解決與中斷的互斥問題,則須要使用如下接口:

Void spin_lock_ireq(spinlock_t *lock); // 禁止中斷並獲取自旋鎖
Void spin_unlock_irq(spinlock_t *lock); // 釋放自旋鎖並使能中斷
Void spin_lock_irq_save(spin_lock_t *lock, unsigned long flags); // 禁止中斷並獲取保存中斷狀態,而後獲取自旋鎖
Void spin_unlok_irq_store(spinlock_t *lock, unsigned long flags); // 釋放自旋鎖,並將中斷狀態恢復爲已保存的狀態值。
Int spin_trylock_irq(spinlock_t *lock); // 禁止中斷並嘗試獲取自旋鎖,成功返回非0值,返回值爲0表示獲取失敗,則中斷狀態恢復爲使能狀態。
Int spin_trylock_irqsave(spinlock_t *lock, unsigned long flags); // 禁止中斷並保存中斷狀態,而後嘗試獲取自旋鎖,返回值爲非0表示獲取成功,0表示獲取失敗,則會恢復中斷狀態。

    這裏簡單的看一下內核是如何實現上述接口的:在kernel/spinlock.c文件中

void __lockfunc _spin_lock_irq(spinlock_t *lock)
{
    local_irq_disable(); // 關閉當前CPU上的中斷
    // 下面的執行流程和spin_lock接口同樣的
    preempt_disable(); 
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    _raw_spin_lock(lock);
}
EXPORT_SYMBOL(_spin_lock_irq);

# define spin_unlock_irq(lock)  _spin_unlock_irq(lock)
void __lockfunc _spin_unlock_irq(spinlock_t *lock)
{
    spin_release(&lock->dep_map, 1, _RET_IP_);
    _raw_spin_unlock(lock);
    local_irq_enable();
    preempt_enable();
}
EXPORT_SYMBOL(_spin_unlock_irq);

    自旋鎖的使用原則:

    1) 能不用盡可能不用,而且持有鎖的時間應儘量的短。由於持有鎖之後,其餘CPU要獲取同一把鎖的執行流程會陷入空循環,消耗CPU資源;

    2) 持有鎖之後儘可能不要再去獲取另外一把鎖,若是須要則代碼各處獲取鎖的順序要一致,不然容易引發死鎖        

    3) 從性能的角度考慮,若是不須要與中斷互斥,則不要使用禁止中斷的接口;

    4) 在獲取自旋鎖之後,到釋放自旋鎖以前,不容許調用或者是間接調用引發系統調度的操做。由於一旦獲取鎖之後,就進入一種特殊狀態--原子上下文,即在此狀態下不容許被打斷,而系統調度會打斷當前的執行流程。

    2、互斥體:

互斥體與信號量都是屬於非原子操做的同步手段,共同的特色是:當獲取失敗須要將當前進程掛起,進入等待狀態,進程將進入睡眠狀態。

    Linux內核互斥體的定義和聲明是在linux/mutex.h頭文件中,主要包括以下接口:

// 初始化:
void mutex_init(struct mutex *lock);
// 定義並初始化互斥體變量lock
DEFINE_MUTEX(lock);
 
// 獲取mutex
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_lock_killable(strut mutex *lock);
int mutex_trylock(struct mutex *lock);
 
// 釋放mutex
void mutex_unlock(struct mutex *lock);
 
// 銷燬mutex
void mutex_destroy(struct mutex *lock)

看到這些接口,會發現與咱們編寫應用程序使用的mutex接口很相似,用法也很相似。

        mutex_lock()若是不可以獲取mutex,則當前進程會進入睡眠,直至有其餘的進程釋放這個mutex,纔會被喚醒。函數返回時說明互斥體已經獲取成功。若是但願進程在等待互斥體時任然能夠響應信號,則應使用mutex_lock_interruptible()

    在使用的過程當中要注意以下幾點:

    1) mutex的獲取有可能致使進程睡眠,因此不可以用於中斷上下文中,只能夠用在進程上下文中;

    2) mutex的獲取與釋放必須是同一個進程,不可以在A進程獲取mutex,而後在B進程釋放mutex

    下面簡單的對其實現源碼邏輯大體的走一下:

    首先看看mutex的定義:

struct mutex {
    /* 1: unlocked, 0: locked, negative: locked, possible waiters */
     atomic_t  count;
     spinlock_t  wait_lock;
     struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
    struct thread_info *owner;
    const char   *name;
    void   *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
};

        若是咱們不進行調試,那麼就只有count、wait_lock和wait_list成員須要關注的。

    能夠發現一個mutex就是一個原子計數器count(保存互斥體的狀態),以及一個用於存放獲取Mutex失敗的進程鏈表wait_listwali_lock是爲了保證原子操做wait_list。因此說互斥體是在自旋鎖的基礎上實現的!

    而後看看mutex的初始化,主要是初始化mutex對象的成員,將原子計數器初始化爲1,表示處於unlocked狀態。

# define mutex_init(mutex) \
do {       \
static struct lock_class_key __key;  \
\
__mutex_init((mutex), #mutex, &__key);  \
} while (0)
 
void  __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_set(&lock->count, 1); // 原子計數器初始化爲1,表示處於unlocked狀態
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
 
debug_mutex_init(lock, name, key);
}

        接着再看看互斥體的獲取mutex_lock()的實現,是如何讓當前進程在獲取mutex失敗的狀況下進入睡眠等待的:

void inline fastcall __sched mutex_lock(struct mutex *lock)
{
    might_sleep();
/*
 * The locking fastpath is the 1->0 transition from
 * 'unlocked' into 'locked' state.
 */
    __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
}

// __mutex_fastpath_lock()的做用是對原子計數器進行減1,並判斷原子計數器是否爲0,若是爲0則直接返回,表示獲取鎖成功,不然表示獲取失敗,會調用__mutex_lock_slowpath()進行後續的阻塞睡眠處理。
static void fastcall noinline __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
    struct mutex *lock = container_of(lock_count, struct mutex, count);
 
    __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0);
}

// 最後仍是經過__mutex_lock_common()函數來完成阻塞睡眠處理:
static inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass)
{
    struct task_struct *task = current;
    struct mutex_waiter waiter;
    unsigned int old_val;
    unsigned long flags;
 
    spin_lock_mutex(&lock->wait_lock, flags);
 
    debug_mutex_lock_common(lock, &waiter);
    mutex_acquire(&lock->dep_map, subclass, 0, _RET_IP_);
    debug_mutex_add_waiter(lock, &waiter, task->thread_info);
 
    /* add waiting tasks to the end of the waitqueue (FIFO): */
     list_add_tail(&waiter.list, &lock->wait_list);
     waiter.task = task;
 
    for (;;) {
    /*
     * Lets try to take the lock again - this is needed even if
     * we get here for the first time (shortly after failing to
     * acquire the lock), to make sure that we get a wakeup once
     * it's unlocked. Later on, if we sleep, this is the
     * operation that gives us the lock. We xchg it to -1, so
     * that when we release the lock, we properly wake up the
     * other waiters:
     */
    old_val = atomic_xchg(&lock->count, -1);
      if (old_val == 1)
       break;
 
    /*
     * got a signal? (This code gets eliminated in the
     * TASK_UNINTERRUPTIBLE case.)
     */
    if (unlikely(state == TASK_INTERRUPTIBLE &&
        signal_pending(task))) {
        mutex_remove_waiter(lock, &waiter, task->thread_info);
        mutex_release(&lock->dep_map, 1, _RET_IP_);
        spin_unlock_mutex(&lock->wait_lock, flags);
 
        debug_mutex_free_waiter(&waiter);
        return -EINTR;
    }
    __set_task_state(task, state);
 
    /* didnt get the lock, go to sleep: */
    spin_unlock_mutex(&lock->wait_lock, flags);
    schedule();
    spin_lock_mutex(&lock->wait_lock, flags);
  }
 
    /* got the lock - rejoice! */
     mutex_remove_waiter(lock, &waiter, task->thread_info);
    debug_mutex_set_owner(lock, task->thread_info);
 
    /* set it to 0 if there are no waiters left: */
     if (likely(list_empty(&lock->wait_list)))
      atomic_set(&lock->count, 0);
 
    spin_unlock_mutex(&lock->wait_lock, flags);
    debug_mutex_free_waiter(&waiter);
    return 0;
}

    __mutex_lock_common()函數代碼比較長,大體的代碼邏輯是:

    1)先將當前進程current放入到mutex的wait_list鏈表中;

    2)而後是執行schedule(),執行進程切換調度,CPU就會從run queue中選取一個優先級最高的任務來運行;

    這個時候,獲取mutex的進程就已經掛起了suspend,須要有其餘的進程調用mutex_unlock(),才能將此進程喚醒,並從新加入到run queue中繼續運行。

    上面的代碼會把schedule()放在一個for循環內部,這是由於進程被喚醒後,須要先檢查條件是否知足,若是不知足,則會再次掛起。


    下面就分析下時如何喚醒等待在此mutex上的進程的: mutex_unlock()

void fastcall __sched mutex_unlock(struct mutex *lock)
{
/*
 * The unlocking fastpath is the 0->1 transition from 'locked'
 * into 'unlocked' state:
 */
    __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}
// __mutex_fastpath_unlock會對原子計數器count進行加1,而後調用__mutex_unlock_slowpath()來喚醒等待此mutex的進程。
static fastcall inline void
__mutex_unlock_common_slowpath(atomic_t *lock_count, int nested)
{
    struct mutex *lock = container_of(lock_count, struct mutex, count);
    unsigned long flags;
 
    spin_lock_mutex(&lock->wait_lock, flags);
    mutex_release(&lock->dep_map, nested, _RET_IP_);
    debug_mutex_unlock(lock);
 
/*
 * some architectures leave the lock unlocked in the fastpath failure
 * case, others need to leave it locked. In the later case we have to
 * unlock it here
 */
    if (__mutex_slowpath_needs_to_unlock())
        atomic_set(&lock->count, 1);
 
     if (!list_empty(&lock->wait_list)) {
        /* get the first entry from the wait-list: */
        struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list);
 
        debug_mutex_wake_waiter(lock, waiter);
        wake_up_process(waiter->task);
     }
 
    debug_mutex_clear_owner(lock);
    spin_unlock_mutex(&lock->wait_lock, flags);
}

        從上面的代碼能夠看出,釋放mutex的過程就是把等待鏈表的第一個進程取出來,而後將其放入run-queue運行隊列中,這樣就可以被調度器調度,獲得運行。其中wake_up_process()函數的做用就是把一個指定的進程放入到run queue中,並將進程的狀態修改成TASK_RUNNING.

    代碼分析到這裏,應該基本上就清楚了mutex的實現邏輯了。使用mutex就是有一點須要特別注意的,必須同一個進程對其lock和unlock操做,仔細分析下代碼,就能夠發現了,這裏就再也不說明了。


3、信號量:

信號量與互斥體都是做爲同步的手段,共同點是當獲取失敗時會致使進程睡眠,因此都是不能夠用在中斷上下文中,只能夠用在進程上下文中。

不一樣點是:

1) mutex的獲取與釋放必須是由同一個進程執行,而semaphore則能夠跨進程使用,便可以在A進程獲取信號量,在B進程釋放信號量。

2) mutex只有lockedunloked兩種狀態值,semaphore的計數器能夠大於1.

簡單的看一下信號量的使用接口:

#include <linux/semaphore.h>
 
// 初始化信號量對象
struct semaphore sem;
void sema_init(struct semapthore *sem, int val);
 
// 獲取信號量
void down(struct semaphore *sem); //若是獲取失敗,進程將進入不可被信號打斷的睡眠狀態
int down_interruptible(struct semaphore *sem); //若是獲取失敗,進程將進入可被信號打斷的睡眠狀態
int down_killable(strut semaphore *sem); // 若是獲取失敗,進程在睡眠等待的過程當中,只響應致命信號
int down_timeout(struct semaphore *sem,  long jiffies); // 進程對信號量的等待操做有時間限制
 
// 釋放信號量
void up(struct semaphpre *sem);

down_interruptible()down_killable()的返回值說明,0表示獲取成功,-EINTR表示在等待信號量的過程當中被信號打斷,信號量獲取失敗。

down_timeout()的返回值說明,0表示成功,-ETIME表示在等待信號量的過程當中超時,獲取信號量失敗。

相關文章
相關標籤/搜索