鎖是操做系統中實現進程同步的重要機制。程序員
臨界區(Critical Section)是指對共享數據進行訪問與操做的代碼區域。所謂共享數據,就是可能有多個代碼執行流併發地執行,並在執行中可能會同時訪問的數據。緩存
同步(Synchronization)是指讓兩個或多個進程/線程可以按照程序員指望的方式來協調執行的順序。好比,讓A進程必須完成某個操做後,B進程才能執行。互斥(Mutual Exclusion)則是指讓多個線程不可以同時訪問某些數據,必需要一個進程訪問完後,另外一個進程才能訪問。併發
當多個進程/線程併發地執行而且訪問一塊數據,而且進程/線程的執行結果依賴於它們的執行順序,咱們就稱這種狀況爲競爭狀態(Race Condition)。app
Xv6操做系統要求在內核臨界區操做時中斷必須關閉。若是此時中斷開啓,那麼可能會出現如下死鎖狀況:A進程在內核態運行並拿下了p鎖時,觸發中斷進入中斷處理程序,中斷處理程序也在內核態中請求p鎖,因爲鎖在A進程手裏,且只有A進程執行時才能釋放p鎖,所以中斷處理程序必須返回,p鎖才能被釋放。那麼此時中斷處理程序會永遠拿不到鎖,陷入無限循環,進入死鎖。函數
Xv6中實現了自旋鎖(Spinlock)用於內核臨界區訪問的同步和互斥。自旋鎖最大的特徵是當進程拿不到鎖時會進入無限循環,直到拿到鎖退出循環。顯然,自旋鎖看上去效率很低,咱們很容易想到更加高效的基於等待隊列的方法,讓等待進程陷入阻塞而不是無限循環。然而,Xv6容許同時運行多個CPU核,多核CPU上的等待隊列實現至關複雜,所以使用自旋鎖是相對比較簡單且能正確執行的實現方案。oop
Xv6中鎖的定義以下優化
// Mutual exclusion lock. struct spinlock { uint locked; // Is the lock held? // For debugging: char *name; // Name of lock. struct cpu *cpu; // The cpu holding the lock. uint pcs[10]; // The call stack (an array of program counters) // that locked the lock. };
核心的變量只有一個locked
,當locked
爲1時表明鎖已被佔用,反之未被佔用,初始值爲0。ui
在調用鎖以前,必須對鎖進行初始化。this
void initlock(struct spinlock *lk, char *name) { lk->name = name; lk->locked = 0; lk->cpu = 0; }
最困難的地方是如何對locked
變量進行原子操做佔用鎖和釋放鎖。這兩步具體被實現爲acquire()
和release()
函數。(注意v7版本和v11版本的實現略有不一樣,本文使用的是v11版本)atom
acquire()
函數// Acquire the lock. // Loops (spins) until the lock is acquired. // Holding a lock for a long time may cause // other CPUs to waste time spinning to acquire it. void acquire(struct spinlock *lk) { pushcli(); // disable interrupts to avoid deadlock. if(holding(lk)) panic("acquire"); // The xchg is atomic. while(xchg(&lk->locked, 1) != 0) ; // Tell the C compiler and the processor to not move loads or stores // past this point, to ensure that the critical section's memory // references happen after the lock is acquired. __sync_synchronize(); // Record info about lock acquisition for debugging. lk->cpu = mycpu(); getcallerpcs(&lk, lk->pcs); }
acquire()
函數首先禁止了中斷,而且使用專門的pushcli()
函數,這個函數保證了若是有兩個acquire()
禁止了中斷,那麼也必須調用兩次release()
中的popcli()
後中斷纔會被容許。而後,acquire()
函數採用xchg
指令來實如今設置locked
爲1的同時得到其原來的值的操做。這裏的C代碼中封裝了一個xchg()
函數,在xchg()
函數中採用GCC的內聯彙編特性,實現以下
static inline uint xchg(volatile uint *addr, uint newval) { uint result; // The + in "+m" denotes a read-modify-write operand. asm volatile("lock; xchgl %0, %1" : "+m" (*addr), "=a" (result) : "1" (newval) : "cc"); return result; }
其中,volatile
標誌用於避免gcc對其進行一些優化;第一個冒號後的"+m" (*addr), "=a" (result)
是這個彙編指令的兩個輸出值;newval
是這個彙編指令的輸入值。假設newval
位於eax
寄存器中,addr
位於rax
寄存器中,那麼gcc會翻譯獲得以下彙編指令
lock; xchgl (%rdx), %eax
因爲xchg
函數是inline
的,它會被直接嵌入調用xchg
函數的代碼中,使用的寄存器可能會有所不一樣。
下面咱們來分析一下上面的指令的語義。·lock
是一個指令前綴,它保證了這條指令對總線和緩存的獨佔權,也就是這條指令的執行過程當中不會有其餘CPU或同CPU內的指令訪問緩存和內存。因爲現代CPU通常是多發射流水線+亂序執行的,所以通常狀況下並不能保證這一點。xchgl
指令是一條古老的x86指令,做用是交換兩個寄存器或者內存地址裏的4字節值,兩個值不能都是內存地址,他不會設置條件碼。
那麼,仔細思考一下就能發現,以上一條xchg
指令就同時作到了交換locked和1的值,而且在以後經過檢查eax
寄存器就能知道locked的值是否爲0。而且,以上操做是原子的,這就保證了有且只有一個進程可以拿到locked的0值而且進入臨界區。
最後,acquire()
函數使用__sync_synchronize
爲了不編譯器對這段代碼進行指令順序調整的話和避免CPU在這塊代碼採用亂序執行的優化。
release()
函數// Release the lock. void release(struct spinlock *lk) { if(!holding(lk)) panic("release"); lk->pcs[0] = 0; lk->cpu = 0; // Tell the C compiler and the processor to not move loads or stores // past this point, to ensure that all the stores in the critical // section are visible to other cores before the lock is released. // Both the C compiler and the hardware may re-order loads and // stores; __sync_synchronize() tells them both not to. __sync_synchronize(); // Release the lock, equivalent to lk->locked = 0. // This code can't use a C assignment, since it might // not be atomic. A real OS would use C atomics here. asm volatile("movl $0, %0" : "+m" (lk->locked) : ); popcli(); }
release
函數爲了保證設置locked爲0的操做的原子性,一樣使用了內聯彙編。最後,使用popcli()來容許中斷(或者彈出一個cli,但由於其餘鎖未釋放使得中斷依然被禁止)。
struct semaphore { int value; struct spinlock lock; struct proc *queue[NPROC]; int end; int start; }; void sem_init(struct semaphore *s, int value) { s->value = value; initlock(&s->lock, "semaphore_lock"); end = start = 0; } void sem_wait(struct semaphore *s) { acquire(&s->lock); s->value--; if (s->value < 0) { s->queue[s->end] = myproc(); s->end = (s->end + 1) % NPROC; sleep(myproc(), &s->lock) } release(&s->lock); } void sem_signal(struct semaphore *s) { acquire(&s->lock); s->value++; if (s->value <= 0) { wakeup(s->queue[s->start]); s->queue[s->start] = 0; s->start = (s->start + 1) % NPROC; } release(&s->lock); }
上面的代碼使用Xv6提供的接口實現了信號量,格式和命名與POSIX標準相似。這個信號量的實現採用等待隊列的方式。當一個進程因信號量陷入阻塞時,會將本身放進等待隊列並睡眠(18-22行)。當一個進程釋放信號量時,會從等待隊列中取出一個進程繼續執行(29-33行)。