在Linux系統上,多個進程能夠同時運行,以及各類中斷髮生的中斷也在同時獲得處理,這種多個上下文宏觀上同時運行的狀況稱爲併發。併發具體包括以下幾種可能:php
1) UP平臺上,一個進程正在執行時被另外一個進程搶佔;linux
2) UP平臺上,一個進程正在執行時發生了中斷,內核轉而執行中斷處理程序;編程
3) SMP平臺上,每一個處理器都會發生UP平臺上的狀況;併發
4) SMP平臺上,多個進程或中斷同時在多個CPU上執行;函數
多個併發的上下文同時使用同一個資源的狀況稱爲競態,而可能發生競態的這一段代碼稱爲臨界區。內核編程時的臨界區,比較多的狀況是:性能
1) 代碼訪問了全局變量,而且這段代碼可被多個進程執行;測試
2) 代碼訪問了全局變量,而且這段代碼可被進程執行,也可被中斷處理程序執行;ui
針對上述狀況,內核提供了以下手段來解決竟態問題:this
1)鎖機制:
atom
2)院子操做:
下面會先介紹鎖機制。Linux內核提供了多種鎖機制,這些鎖機制的區別在於,當獲取不到鎖時,執行程序是否發生睡眠並進行系統調度。具體包括自旋鎖、互斥體、信號量。
1、自旋鎖:spinlock_t
自旋鎖有兩個基本操做:獲取與釋放。獲取自旋鎖時,當判斷鎖的狀態爲未鎖,則會立刻加鎖,若是已是鎖定的狀態,當期執行流程則會執行「忙等待」,中間沒有任何的調度操做。也就說執行流程從判斷鎖的狀態到完成加鎖,是一個原子操做,在執行上是不可分割的。
自旋鎖的實現是平臺相關的,在使用的時候,只需統一包含以下頭文件:
#include <linux/spinlock.h>
自旋鎖的變量類型是spinlock_t,定義以下:
typedef struct { raw_spinlock_t raw_lock; #if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP) unsigned int break_lock; #endif } spinlock_t; typedef struct { volatile unsigned int lock; } raw_spinlock_t;
1)自旋鎖須要初始化才能使用,自旋鎖的初始化接口以下:
# define spin_lock_init(lock) \ do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
2)獲取自旋鎖的接口以下:
void spin_lock(spinlock_t *lock);
3)釋放自旋鎖的接口以下:
void spin_unlock(spinlock_t *lock);
獲取自旋鎖的時候,內部會首先禁止搶佔,而後來時循環判斷鎖的狀態。在UP版本中,惟一的操做就是禁止搶佔,若是是UP版本且非搶佔式內核,則進一步退化爲無操做。能夠粗略的看一下內核源碼的實現,由於自旋鎖的實現是與平臺相關的,這裏以arm平臺爲例:
void __lockfunc _spin_lock(spinlock_t *lock) { preempt_disable(); //關閉搶佔 spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); //這個若是沒有打開調試自旋鎖的宏的話,至關於無操做。 _raw_spin_lock(lock); //這裏調用平臺相關的代碼來輪詢鎖的狀態 }
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
unsigned long tmp;
__asm__ __volatile__(
"1: ldrex %0, [%1]\n" //取lock->lock放在 tmp裏,而且設置&lock->lock這個內存地址爲獨佔訪問
" teq %0, #0\n" // 測試lock->lock是否爲0,影響標誌位z
#ifdef CONFIG_CPU_32v6K
" wfene\n"
#endif
" strexeq %0, %2, [%1]\n" //若是lock->lock是0,而且是獨佔訪問這個內存,就向lock->lock裏 寫入1,並向tmp返回0,同時清除獨佔標記
" teqeq %0, #0\n" //如 果lock->lock是0,而且strexeq返回了0,表示加鎖成功,返回
" bne 1b" //若是加鎖失敗,則會向後跳轉到上面的標號1處,再次從新執行
: "=&r" (tmp)
: "r" (&lock->lock), "r" (1)
: "cc");
smp_mb();
}
上面那段彙編代碼就是循環判斷lock的值,最後的那個bne 1b,就能夠明白爲何自旋鎖在獲取不到鎖的狀況下,會進行所謂的「自旋」了!
4) 此外,內核還提供了一個用於嘗試獲取自旋鎖的接口:
Int spin_trylock(spinlock_t *lock);
到這裏爲止,上面介紹的接口都沒有考慮到在獲取鎖之後又發生中斷的問題,若是要解決與中斷的互斥問題,則須要使用如下接口:
Void spin_lock_ireq(spinlock_t *lock); // 禁止中斷並獲取自旋鎖 Void spin_unlock_irq(spinlock_t *lock); // 釋放自旋鎖並使能中斷 Void spin_lock_irq_save(spin_lock_t *lock, unsigned long flags); // 禁止中斷並獲取保存中斷狀態,而後獲取自旋鎖 Void spin_unlok_irq_store(spinlock_t *lock, unsigned long flags); // 釋放自旋鎖,並將中斷狀態恢復爲已保存的狀態值。 Int spin_trylock_irq(spinlock_t *lock); // 禁止中斷並嘗試獲取自旋鎖,成功返回非0值,返回值爲0表示獲取失敗,則中斷狀態恢復爲使能狀態。 Int spin_trylock_irqsave(spinlock_t *lock, unsigned long flags); // 禁止中斷並保存中斷狀態,而後嘗試獲取自旋鎖,返回值爲非0表示獲取成功,0表示獲取失敗,則會恢復中斷狀態。
這裏簡單的看一下內核是如何實現上述接口的:在kernel/spinlock.c文件中
void __lockfunc _spin_lock_irq(spinlock_t *lock) { local_irq_disable(); // 關閉當前CPU上的中斷 // 下面的執行流程和spin_lock接口同樣的 preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); _raw_spin_lock(lock); } EXPORT_SYMBOL(_spin_lock_irq); # define spin_unlock_irq(lock) _spin_unlock_irq(lock) void __lockfunc _spin_unlock_irq(spinlock_t *lock) { spin_release(&lock->dep_map, 1, _RET_IP_); _raw_spin_unlock(lock); local_irq_enable(); preempt_enable(); } EXPORT_SYMBOL(_spin_unlock_irq);
自旋鎖的使用原則:
1) 能不用盡可能不用,而且持有鎖的時間應儘量的短。由於持有鎖之後,其餘CPU要獲取同一把鎖的執行流程會陷入空循環,消耗CPU資源;
2) 持有鎖之後儘可能不要再去獲取另外一把鎖,若是須要則代碼各處獲取鎖的順序要一致,不然容易引發死鎖
3) 從性能的角度考慮,若是不須要與中斷互斥,則不要使用禁止中斷的接口;
4) 在獲取自旋鎖之後,到釋放自旋鎖以前,不容許調用或者是間接調用引發系統調度的操做。由於一旦獲取鎖之後,就進入一種特殊狀態--原子上下文,即在此狀態下不容許被打斷,而系統調度會打斷當前的執行流程。
2、互斥體:
互斥體與信號量都是屬於非原子操做的同步手段,共同的特色是:當獲取失敗須要將當前進程掛起,進入等待狀態,進程也將進入睡眠狀態。
Linux內核互斥體的定義和聲明是在linux/mutex.h頭文件中,主要包括以下接口:
// 初始化: void mutex_init(struct mutex *lock); // 定義並初始化互斥體變量lock DEFINE_MUTEX(lock); // 獲取mutex void mutex_lock(struct mutex *lock); int mutex_lock_interruptible(struct mutex *lock); int mutex_lock_killable(strut mutex *lock); int mutex_trylock(struct mutex *lock); // 釋放mutex void mutex_unlock(struct mutex *lock); // 銷燬mutex void mutex_destroy(struct mutex *lock)
看到這些接口,會發現與咱們編寫應用程序使用的mutex接口很相似,用法也很相似。
mutex_lock()若是不可以獲取mutex,則當前進程會進入睡眠,直至有其餘的進程釋放這個mutex,纔會被喚醒。函數返回時說明互斥體已經獲取成功。若是但願進程在等待互斥體時任然能夠響應信號,則應使用mutex_lock_interruptible()。
在使用的過程當中要注意以下幾點:
1) mutex的獲取有可能致使進程睡眠,因此不可以用於中斷上下文中,只能夠用在進程上下文中;
2) mutex的獲取與釋放必須是同一個進程,不可以在A進程獲取mutex,而後在B進程釋放mutex;
下面簡單的對其實現源碼邏輯大體的走一下:
首先看看mutex的定義:
struct mutex { /* 1: unlocked, 0: locked, negative: locked, possible waiters */ atomic_t count; spinlock_t wait_lock; struct list_head wait_list; #ifdef CONFIG_DEBUG_MUTEXES struct thread_info *owner; const char *name; void *magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif };
若是咱們不進行調試,那麼就只有count、wait_lock和wait_list成員須要關注的。
能夠發現一個mutex就是一個原子計數器count(保存互斥體的狀態),以及一個用於存放獲取Mutex失敗的進程鏈表wait_list,wali_lock是爲了保證原子操做wait_list。因此說互斥體是在自旋鎖的基礎上實現的!
而後看看mutex的初始化,主要是初始化mutex對象的成員,將原子計數器初始化爲1,表示處於unlocked狀態。
# define mutex_init(mutex) \ do { \ static struct lock_class_key __key; \ \ __mutex_init((mutex), #mutex, &__key); \ } while (0) void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key) { atomic_set(&lock->count, 1); // 原子計數器初始化爲1,表示處於unlocked狀態 spin_lock_init(&lock->wait_lock); INIT_LIST_HEAD(&lock->wait_list); debug_mutex_init(lock, name, key); }
接着再看看互斥體的獲取mutex_lock()的實現,是如何讓當前進程在獲取mutex失敗的狀況下進入睡眠等待的:
void inline fastcall __sched mutex_lock(struct mutex *lock) { might_sleep(); /* * The locking fastpath is the 1->0 transition from * 'unlocked' into 'locked' state. */ __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath); } // __mutex_fastpath_lock()的做用是對原子計數器進行減1,並判斷原子計數器是否爲0,若是爲0則直接返回,表示獲取鎖成功,不然表示獲取失敗,會調用__mutex_lock_slowpath()進行後續的阻塞睡眠處理。 static void fastcall noinline __sched __mutex_lock_slowpath(atomic_t *lock_count) { struct mutex *lock = container_of(lock_count, struct mutex, count); __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0); } // 最後仍是經過__mutex_lock_common()函數來完成阻塞睡眠處理: static inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass) { struct task_struct *task = current; struct mutex_waiter waiter; unsigned int old_val; unsigned long flags; spin_lock_mutex(&lock->wait_lock, flags); debug_mutex_lock_common(lock, &waiter); mutex_acquire(&lock->dep_map, subclass, 0, _RET_IP_); debug_mutex_add_waiter(lock, &waiter, task->thread_info); /* add waiting tasks to the end of the waitqueue (FIFO): */ list_add_tail(&waiter.list, &lock->wait_list); waiter.task = task; for (;;) { /* * Lets try to take the lock again - this is needed even if * we get here for the first time (shortly after failing to * acquire the lock), to make sure that we get a wakeup once * it's unlocked. Later on, if we sleep, this is the * operation that gives us the lock. We xchg it to -1, so * that when we release the lock, we properly wake up the * other waiters: */ old_val = atomic_xchg(&lock->count, -1); if (old_val == 1) break; /* * got a signal? (This code gets eliminated in the * TASK_UNINTERRUPTIBLE case.) */ if (unlikely(state == TASK_INTERRUPTIBLE && signal_pending(task))) { mutex_remove_waiter(lock, &waiter, task->thread_info); mutex_release(&lock->dep_map, 1, _RET_IP_); spin_unlock_mutex(&lock->wait_lock, flags); debug_mutex_free_waiter(&waiter); return -EINTR; } __set_task_state(task, state); /* didnt get the lock, go to sleep: */ spin_unlock_mutex(&lock->wait_lock, flags); schedule(); spin_lock_mutex(&lock->wait_lock, flags); } /* got the lock - rejoice! */ mutex_remove_waiter(lock, &waiter, task->thread_info); debug_mutex_set_owner(lock, task->thread_info); /* set it to 0 if there are no waiters left: */ if (likely(list_empty(&lock->wait_list))) atomic_set(&lock->count, 0); spin_unlock_mutex(&lock->wait_lock, flags); debug_mutex_free_waiter(&waiter); return 0; }
__mutex_lock_common()函數代碼比較長,大體的代碼邏輯是:
1)先將當前進程current放入到mutex的wait_list鏈表中;
2)而後是執行schedule(),執行進程切換調度,CPU就會從run queue中選取一個優先級最高的任務來運行;
這個時候,獲取mutex的進程就已經掛起了suspend,須要有其餘的進程調用mutex_unlock(),才能將此進程喚醒,並從新加入到run queue中繼續運行。
上面的代碼會把schedule()放在一個for循環內部,這是由於進程被喚醒後,須要先檢查條件是否知足,若是不知足,則會再次掛起。
下面就分析下時如何喚醒等待在此mutex上的進程的: mutex_unlock()
void fastcall __sched mutex_unlock(struct mutex *lock) { /* * The unlocking fastpath is the 0->1 transition from 'locked' * into 'unlocked' state: */ __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath); } // __mutex_fastpath_unlock會對原子計數器count進行加1,而後調用__mutex_unlock_slowpath()來喚醒等待此mutex的進程。 static fastcall inline void __mutex_unlock_common_slowpath(atomic_t *lock_count, int nested) { struct mutex *lock = container_of(lock_count, struct mutex, count); unsigned long flags; spin_lock_mutex(&lock->wait_lock, flags); mutex_release(&lock->dep_map, nested, _RET_IP_); debug_mutex_unlock(lock); /* * some architectures leave the lock unlocked in the fastpath failure * case, others need to leave it locked. In the later case we have to * unlock it here */ if (__mutex_slowpath_needs_to_unlock()) atomic_set(&lock->count, 1); if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list); debug_mutex_wake_waiter(lock, waiter); wake_up_process(waiter->task); } debug_mutex_clear_owner(lock); spin_unlock_mutex(&lock->wait_lock, flags); }
從上面的代碼能夠看出,釋放mutex的過程就是把等待鏈表的第一個進程取出來,而後將其放入run-queue運行隊列中,這樣就可以被調度器調度,獲得運行。其中wake_up_process()函數的做用就是把一個指定的進程放入到run queue中,並將進程的狀態修改成TASK_RUNNING.
代碼分析到這裏,應該基本上就清楚了mutex的實現邏輯了。使用mutex就是有一點須要特別注意的,必須同一個進程對其lock和unlock操做,仔細分析下代碼,就能夠發現了,這裏就再也不說明了。
3、信號量:
信號量與互斥體都是做爲同步的手段,共同點是當獲取失敗時會致使進程睡眠,因此都是不能夠用在中斷上下文中,只能夠用在進程上下文中。
不一樣點是:
1) mutex的獲取與釋放必須是由同一個進程執行,而semaphore則能夠跨進程使用,便可以在A進程獲取信號量,在B進程釋放信號量。
2) mutex只有locked和unloked兩種狀態值,semaphore的計數器能夠大於1.
簡單的看一下信號量的使用接口:
#include <linux/semaphore.h> // 初始化信號量對象 struct semaphore sem; void sema_init(struct semapthore *sem, int val); // 獲取信號量 void down(struct semaphore *sem); //若是獲取失敗,進程將進入不可被信號打斷的睡眠狀態 int down_interruptible(struct semaphore *sem); //若是獲取失敗,進程將進入可被信號打斷的睡眠狀態 int down_killable(strut semaphore *sem); // 若是獲取失敗,進程在睡眠等待的過程當中,只響應致命信號 int down_timeout(struct semaphore *sem, long jiffies); // 進程對信號量的等待操做有時間限制 // 釋放信號量 void up(struct semaphpre *sem);
down_interruptible()和down_killable()的返回值說明,0表示獲取成功,-EINTR表示在等待信號量的過程當中被信號打斷,信號量獲取失敗。
down_timeout()的返回值說明,0表示成功,-ETIME表示在等待信號量的過程當中超時,獲取信號量失敗。