最近看了一本書,名字叫作《Operating Systems: Three Easy Pieces》,它的中文版是《操做系統導論》,原書在豆瓣評分9.7分,質量還不錯。該書圍繞虛擬化、併發和持久性這三個主要概念展開,行文詼諧幽默卻又鞭辟入裏,不一樣於尋常的操做系統書籍。這些天看了併發的幾個章節,我主要關注了"鎖"的部分,細讀下來,有了更深入的認識。html
因此這篇文章就是對《操做系統導論》中講解鎖的章節的一個讀書筆記,實在是忍不住想分享出來。java
鎖實際上是一個變量,咱們須要聲明某種類型的鎖變量才能使用,好比下例:程序員
lock_t mutex; //聲明
...
lock(&mutex); //加鎖
balance = balance + 1;
unlock(&mutex); //解鎖
複製代碼
鎖變量保存了鎖在某一時刻的狀態,它要麼是可用的(avaliable,或unlocked,或free),表示沒有線程持有鎖,要麼是被佔用的(acquired,或locked,或held),表示有一個線程持有鎖,正處於臨界區。bash
鎖爲程序員提供了最小程度的調度控制,線程能夠視爲程序員建立的實體,可是被操做系統調度,具體方式由操做系統選擇,而鎖讓程序員得到一些控制權,經過給臨界區加鎖,能夠保證臨界區內只有一個線程活躍。併發
此外,POSIX庫將鎖稱爲互斥量(mutex),由於它被用來提供線程之間的互斥,即當一個線程在臨界區,它可以阻止其餘線程進入直到本線程離開臨界區。函數
咱們已經從程序員的角度,對鎖如何工做有必定的理解。那如何實現一個鎖呢?咱們須要什麼硬件支持?須要什麼操做系統的支持?下面會進行解答。性能
而在實現鎖以前,咱們還須要明確目標,須要設立一些標準才能讓「鎖」工做的好,主要有3個標準:測試
這三個標準映射到Java層面來講:fetch
- 互斥鎖: JDK中的synchronized和JUC中的Lock就是互斥鎖,保證一次最多隻能由一個線程持有該鎖
- 公平鎖與非公平鎖: Java中的公平鎖是指多個線程在等待同一個鎖時,必須按照申請鎖的前後順序來獲取鎖,而非公平鎖是能夠搶佔的,公平鎖可使用new ReentrantLock(true)實現
- 鎖的性能: JDK中的synchronized的鎖升級——偏向鎖、輕量級鎖和重量級鎖,這幾個鎖的實現與轉換,就是爲了提高synchronized鎖的性能
測試並設置指令 (test-and-set instruction) ,在x86系統上,具體是指xchg (atomic exchange,原子交換) 指令,咱們用以下的C代碼片斷來定義測試並設置指令作了什麼:ui
int TestAndSet(int *old_ptr,int new) {
int old = *old_ptr; //fetch old value at old_ptr
*old_ptr = new; //store 'new' into old_ptr
return old; //return the old value
}
複製代碼
它返回 old_ptr 指向的舊值,同時更新爲 new 的新值。同時須要注意的是,上述的僞代碼是爲了說明使用,直接傳統編譯確定不能保證原子性,須要操做系統的硬件指令 (x86的xchg指令) 支持以保證原子性。
由於既能夠測試舊值,又能夠設置新值,因此把這條指令叫做 "測試並設置"。依靠這一條指令徹底能夠實現一個簡單的自旋鎖 (spin lock),代碼以下:
typedef struct lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0 表示鎖可用,1表示鎖已經被搶佔
lock->flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag,1) == 1)
{
; // 自旋等待 (do something)
}
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
複製代碼
首先假設一個線程在運行,調用lock(),沒有其餘線程持有鎖,因此flag = 0,當調用 TestAndSet(flag,1) 方法,返回0,線程會跳出 while 循環,獲取鎖。同時也還會原子的設置flag爲1,標誌鎖已經被持有。當線程離開臨界區,調用 unlock() 將 flag 清理爲0。
依靠操做系統的硬件原語,將測試 (test 舊的鎖值) 和設置 (set 新的值) 合併爲一個原子操做以後,咱們保證了只有一個線程能獲取鎖,這就實現了一個有效的互斥原語!
上面的代碼也就是自旋鎖 (spin lock) 的實現,這是最簡單的一種鎖,一直自旋,利用CPU週期,直到鎖可用。如今按照以前的標準來評價基本的自旋鎖:
某些系統提供了另外一個硬件原語,即比較並交換指令 (SPARC系統是compare-and-swap,x86系統是compare-and-exchange),下面是這條指令的C語言僞代碼。
int CompareAndSwap(int *ptr,int expected,int new) {
int actual = *ptr;
if (actual == expected)
{
*ptr = new;
}
return actual;
}
複製代碼
比較並交換的基本思路是檢測 ptr 指向的值是否和 expected 相等;若是是,更新 ptr 所指的值爲新值。不然,什麼也不作。不論哪一種狀況,都返回該內存地址的值
有了比較並交換指令,就能夠實現一個鎖,相似於用測試並設置那樣。例如,咱們只須要用下面的代碼替換lock()函數:
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag,0,1) == 1)
; //spin
}
複製代碼
比較並交換指令實際上就是CAS指令,在Java開發工做中,咱們也會經常遇到,好比使用原子類AtomicXXX,內部實現就是使用了CAS操做。
還有一個硬件原語是: 獲取並增長 (fetch-and-add) 指令,它能原子地返回特定地址的舊值,而且讓該值自增1,在x86系統中,是xadd指令。獲取並增長的C語言僞代碼以下:
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
;//spin
}
void unlock(lock_t *lock) {
FetchAndAdd(&lock->turn);
}
複製代碼
在這個例子中,咱們用獲取並增長指令,實現了一個ticket鎖,其實就是一個公平鎖。
該實現不是利用一個值,而是使用了ticket 和 turn 兩個變量來構建。基本思想是:若是線程但願得到鎖,首先對一個ticket值執行一個原子的獲取並增長指令。這個值做爲該線程的"turn"(即myturn,爲該線程設定獲取鎖的順序)。根據全局共享的lock->turn 變量,當某一個線程的 myturn == turn 時,則輪到這個線程進入臨界區。unlock 則是增長 turn,從而下一個等待線程能夠進入臨界區。
本方法可以保證全部線程都能獲到鎖,並且是按線程來的前後順序,也就是一種先進先出 (FIFO) 的公平性機制,該鎖還有一種說法,叫作 排號自旋鎖 (Ticket Lock)
前面幾種指令均可以實現自旋鎖,其實現也是很是簡單,可是自旋過多,怎麼辦呢?好比以兩個線程運行在單處理器爲例,當一個線程(線程0)持有鎖時,被中斷。第二個線程(線程1)去獲取鎖,發現鎖已經被持有。所以,它就開始自旋,接着自旋。若是線程0長時間持有鎖,那麼線程2會一直自旋,浪費CPU時間,因此如何讓鎖減小沒必要要地自旋?
只有硬件支持是不夠的,咱們還須要操做系統支持!
咱們能夠利用 Solaris 提供的支持,它提供了兩個調用:
能夠用這兩個調用來實現鎖,讓調用者在獲取不到鎖時睡眠,在鎖可用時被喚醒。下面是C語言實現的僞代碼。
typedef struct lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queueu_init(m->q);
}
void lock(lock_t *m) {
while (TestAndSet(&m->guard,1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; //lock is acquired
m->guard = 0;
} else {
queue_add(m->q,gettid());
m->guard = 0;
park();
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard,1) == 1)
; // acquire guard lock by spinning
if (queue_empty(m->q)) {
m->flag = 0; // let go of lock;on one wants it;
} else {
unpark(queue_remove(m->q)) ;//hold lock(for next thread!)
}
m->guard = 0;
}
複製代碼
咱們將以前的測試並設置和等待隊列結合,實現了一個更高性能的鎖,guard基本上起到了自旋鎖的做用。其次,咱們經過隊列來控制誰會得到鎖,避免餓死。
長時間的自旋等待會消耗處理器時間。對此,Java中的自旋鎖也有必定的處理措施:自旋等待的時間必需要有必定的限度,若是自旋超過了限定次數沒有成功得到鎖,就應當掛起線程,限定次數默認爲10次,可使用 -XX:PreBlockSpin來更改。
自旋鎖在JDK1.4.2中引入,使用-XX:+UseSpinning來開啓,其實現原理就是以前提到的CAS。JDK 6中變爲默認開啓,而且引入了自適應的自旋鎖(適應性自旋鎖)。
自適應意味着自旋的時間(次數)再也不固定,而是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。若是在同一個鎖對象上,自旋等待剛剛成功得到過鎖,而且持有鎖的線程正在運行中,那麼虛擬機就會認爲此次自旋也是頗有可能再次成功,進而它將容許自旋等待持續相對更長的時間