這篇文章是一篇併發相關的源碼分析文章,主要從源碼級別分析AQS操做(主要關於阻塞鎖的實現),從而加深對併發中ReentrantLock
和ReentrantReadWriteLock
兩種鎖的理解。java
下面引用JDK文檔中對AQS類的描述:node
爲實現依賴於先進先出 (FIFO) 等待隊列的阻塞鎖和相關同步器(信號量、事件,等等)提供一個框架。此類的設計目標是成爲依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎。併發
在這篇文章中咱們只關注有關鎖的實現的相關部分,那麼這段話就能夠這樣理解。app
AQS類的內部實現了一個FIFO的等待隊列(實際是一個鏈表),用來保存線程等相關內容,同時,利用這個隊列來實現一個阻塞鎖。框架
同時AQS中有一個int類型的值state,這個值有相關的原子操做,咱們可使用相關的方法來修改這個值。函數
當state的值等於0時,咱們認爲沒有線程得到了鎖;當state的值大於0時,咱們認爲已經有線程得到了鎖,同時,線程每重入一次,state的值加一,這樣就能夠實現鎖的重入。源碼分析
AQS中咱們有獨佔和共享兩種模式,所以咱們既能夠實現獨佔鎖,也能夠實現共享鎖。ui
AQS的隊列的實現主要有一個內部類Node
,來表示隊列的結點。this
static final class Node { //結點的狀態,signal等 volatile int waitStatus; //結點的前驅結點 volatile Node prev; //結點的後繼結點 volatile Node next; //結點保存的線程 volatile Thread thread; //表示當前結點的模式,獨佔仍是共享 Node nextWaiter; }
由此咱們能夠看出,所謂的FIFO隊列其實是一個雙向鏈表,結點中的主要數據就是狀態和線程。咱們會將全部獲取鎖可是沒有成功的線程保存到隊列中,這樣咱們就能夠在此基礎上實現其餘功能,例如公平鎖和非公平鎖等內容。線程
下面給出結點狀態的可能取值:
//因爲超時或者中斷,線程處於取消狀態 static final int CANCELLED = 1; //線程處於阻塞狀態 static final int SIGNAL = -1; //線程再等待知足條件 static final int CONDITION = -2; //線程狀態須要向後傳播 static final int PROPAGATE = -3;
結點的默認狀態爲0,這種狀態表示結點的狀態不屬於上面的任意一種。當結點狀態大於0時,意味着線程不須要被阻塞,所以,不少時候JDK在校驗的時候沒有校驗到具體值,而是一種範圍校驗。
既然實現了隊列,那麼天然須要維護隊列中的結點,後面咱們會介紹相關方法。
ReentrantLock
加鎖方法的概述首先咱們先來描述一下ReentrantLock
使用AQS實現加鎖須要實現的功能。
一、 將全部須要加鎖的線程維護到隊列中
二、 將全部須要等待的線程阻塞
三、 將全部被取消的線程從隊列中刪除
四、 將全部被中斷的線程中斷
五、 將得到鎖的線程從隊列中刪除
使用AQS來實現一個鎖,咱們須要在類的內部聲明一個非公共的類來繼承AQS類,而且實現AQS類中的部分方法。
通常來講,咱們須要本身重寫方法以下
//獨佔獲取鎖 tryAcquire(int) //獨佔模式釋放鎖 tryRelease(int) //共享模式獲取鎖 tryAcquireShared(int) //共享模式釋放鎖 tryReleaseShared(int)
在AQS中,上面幾個方法體是直接拋出異常,因此若是咱們沒有本身重寫,那麼加鎖方法是沒法調用的。
查看ReentrantLock
類的源碼,其內部有一個類Sync
類。,繼承了AQS類,而且重寫了部分方法。因爲ReentrantLock
和ReentrantLock
實現了公平鎖和非公平鎖,因此,其類的內部又有兩個類NonfairSync
和FairSync
類繼承了Sync
類。
上面的介紹應該讓你們已經對AQS的使用和基本功能有所瞭解了,接下來,咱們將跟隨ReentrantLock
源碼的思路,一步步分析加鎖和解鎖的流程。
博主會把JDK中的源碼主要內容粘貼到文章中,好了,開始吧。
下面這段是ReentrantLock
中的加鎖部分
public void lock() { sync.lock(); }
能夠看出加鎖部分實際是調用了Sync
類的lock()
方法。
當咱們查看lock()
方法時發現,這個方法是一個抽象方法,實際上,在公平鎖和非公平鎖中,這個方法的實現是不一樣,所以,這個方法是由NonfairSync
和FairSync
類實現的,下面以非公平鎖中的實現爲例。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
加鎖過程概覽見下圖
加鎖過程當中,首先經過一個CAS操做,若是state的值爲0(即沒有線程得到了獨佔鎖),那麼將其設置爲1(表示已經有一個線程得到了獨佔鎖),若是設置成功,那麼加鎖就成功了,而後調用方法setExclusiveOwnerThread
,將得到鎖的線程設置爲當前線程。
因爲是非公平鎖,因此不須要考慮線程的等待時間等因素。
同時須要注意,若是CAS操做失敗,也就是說已經有線程已經得到了獨佔鎖,這時咱們調用acquire
函數獲取鎖,這裏應該是考慮到鎖重入等狀況。同時若是是第一個線程進入,就能夠直接得到鎖,效率較高。
下面給出acquire
方法,這個方法其實是由AQS給出的實現。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這裏能夠看出acquire
方法會調用咱們本身實現的tryAcquire
方法來嘗試得到鎖,若是獲取鎖成功,那麼方法就直接結束了,若是獲取鎖失敗會繼續執行後面的方法。
tryAcquire
方法那麼,接下來查看非公平鎖中的tryAcquire
方法。
注意,tryAcquire
方法是由ReentrantLock
類實現的
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
查看源碼,nonfairTryAcquire
方法是在Sync
類中給出的。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //沒有線程得到過鎖 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } /** * 已經有線程獲取了鎖,因爲是獨佔鎖,因此其餘線程不能夠 *再次進入,可是有一種特殊狀況,就是當前線程或得到鎖的 *線程是同一個,這時知足可重入 */ else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
這段代碼的總體流程以下圖:
這段代碼的思路以下:
addWaiter
以及acquireQueued
方法這裏讓咱們的目光會到acquire
方法中,假如獲取鎖失敗,那麼接下來咱們須要作的就是維護FIFO鏈表,而且讓全部沒有得到鎖的線程進入等待隊列,直到得到鎖。
爲了方便,這裏將acquire
方法再次給出
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
下面介紹方法addWaiter
和acquireQueued
,這裏Node.EXCLUSIVE
實際上就是null
,這裏是用來區分節點是等待獨佔鎖仍是共享鎖的。
給出addWaiter
的代碼,這部分代碼有AQS實現
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//新建節點 Node pred = tail; //若是隊列有值那麼能夠直接將當前節點加入到隊列尾部,注意這裏若是多個結點同時進入是有可能發生CAS操做失敗的狀況,因此接下來就須要enq方法 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //調用enq初始化隊列並將結點加入隊列中 enq(node); return node; }
這部分代碼的主要職責是維護鏈表的順序,生成一個節點,將節點維護到鏈表的尾部,若是鏈表爲空或者CAS設置tail失敗那麼就須要enq
方法來初始化鏈表,並將當前節點維護到鏈表的尾部。
private Node enq(final Node node) { //死循環,節點會自旋,直到成功加入隊列,若是有併發進入這個方法,也不會有問題 for (;;) { Node t = tail; if (t == null) { // 若是隊列爲空,須要初始化一個空的Node節點,並讓head和tail都指向它 if (compareAndSetHead(new Node())) tail = head; } else {//將節點放入隊列的尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
給出acquireQueued
的代碼,這部分代碼由AQS實現
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //若是前驅結點爲頭結點,那麼只有當前結點在等待獲取鎖,那麼就能夠再次嘗試獲取鎖,若是獲取成功就能夠反悔了 if (p == head && tryAcquire(arg)) { //獲取鎖成功後釋放結點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //判斷當前結點是否被中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
這部分代碼實際也是一個死循環,當有一個結點進入後會一直自旋,知道成功得到鎖。若是在等待的過程當中,線程被中斷,則不會響應,可是會標記一下,當該結點的線程得到鎖後,再對線程進行中斷處理。
經過兩幅圖片介紹一下acquireQueued方法的兩種狀況
狀況一,隊列中只有頭結點和當前結點
狀況二,當前結點不是隊列中除頭結點外的惟一結點
下面咱們詳細介紹兩個方法,自旋的兩個重要函數shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //若是前驅結點的狀態爲SIGNAL,那麼當前結點能夠park即讓其等待 if (ws == Node.SIGNAL) return true; if (ws > 0) {//前驅結點的狀態爲cancle //把全部狀態爲cancle的結點所有跳過,即從隊列中刪除 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //將前驅結點的狀態設置爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { //上面函數返回值爲真,才能進入這個函數,表示當前結點能夠被park,而且再檢查線程是否被中斷,並清除中斷狀態,暫時不作響應 LockSupport.park(this); return Thread.interrupted(); }
目前進行到這裏加鎖還剩最後一步,即對線程中斷的處理,若是線程被中斷,而且得到了鎖,那麼會調用方法selfInterrupt
來進行中斷處理。
static void selfInterrupt() { //再次調用線程中段方法 Thread.currentThread().interrupt(); }
通過上面的折磨,終於走到了釋放鎖部分的代碼,這部分代碼相對於加鎖部分要簡單不少。
public void unlock() { sync.release(1); } public final boolean release(int arg) { //嘗試釋放鎖,釋放成功那麼須要將其餘等待中的結點喚醒 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; //若是調用釋放鎖的線程和佔有鎖的線程不是同一個,拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //因爲鎖的可重入,只有當state的值爲0的時候才能真正釋放鎖 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //修改State的值 setState(c); return free; }
釋放鎖部分的代碼實際上很簡單,因爲只有得到鎖得線程纔可以釋放鎖,所以,若是調用釋放鎖的線程和佔有鎖的線程不是同一個,則須要拋出異常。另一個須要注意的點就是,因爲鎖的可重入性,咱們使用了state的值來表示鎖的重入次數,所以,只有當state的值變爲0,咱們纔可以真正的釋放鎖,可是,不管是否釋放了鎖,咱們都須要將state的值修改。
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }