ReentrantLock主要利用CAS+CLH隊列來實現。它支持公平鎖和非公平鎖,二者的實現相似。java
CAS:Compare and Swap,比較並交換。CAS有3個操做數:內存值V、預期值A、要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改成B,不然什麼都不作。該操做是一個原子操做,被普遍的應用在Java的底層實現中。在Java中,CAS主要是由sun.misc.Unsafe這個類經過JNI調用CPU底層指令實現。node
CLH隊列:帶頭結點的雙向非循環鏈表(以下圖所示):安全
ReentrantLock的基本實現能夠歸納爲:先經過CAS嘗試獲取鎖。若是此時已經有線程佔據了鎖,那就加入CLH隊列而且被掛起。當鎖被釋放以後,排在CLH隊列隊首的線程會被喚醒,而後CAS再次嘗試獲取鎖。在這個時候,若是:函數
1.非公平鎖:若是同時還有另外一個線程進來嘗試獲取,那麼有可能會讓這個線程搶先獲取;性能
2. 公平鎖:若是同時還有另外一個線程進來嘗試獲取,當它發現本身不是在隊首的話,就會排到隊尾,由隊首的線程獲取到鎖。ui
ReentrantLock是java concurrent包提供的一種鎖實現。不一樣於synchronized,ReentrantLock是從代碼層面實現同步的。
圖1 reentrantLock的類層次結構圖this
Lock定義了鎖的接口規範。
ReentrantLock實現了Lock接口。
AbstractQueuedSynchronizer中以隊列的形式實現線程之間的同步。
ReentrantLock的方法都依賴於AbstractQueuedSynchronizer的實現。spa
Lock接口定義了以下方法:
圖2 lock接口規範線程
一、lock()方法的實現
進入lock()方法,發現其內部調用的是sync.lock();設計
public void lock() { sync.lock(); }
sync是在ReentrantLock的構造函數中實現的。其中fair參數的不一樣可實現公平鎖和非公平鎖。因爲在鎖釋放的階段,鎖處於無線程佔有的狀態,此時其餘線程和在隊列中等待的線程均可以搶佔該鎖,從而出現公平鎖和非公平鎖的區別。
非公平鎖:當鎖處於無線程佔有的狀態,此時其餘線程和在隊列中等待的線程均可以搶佔該鎖。
公平鎖:當鎖處於無線程佔有的狀態,在其餘線程搶佔該鎖的時候,都須要先進入隊列中等待。
本文以非公平鎖NonfairSync的sync實例進行分析。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); }
由圖1可知,NonfairSync繼承自Sync,所以也繼承了AbstractQueuedSynchronizer中的全部方法實現。接着進入NonfairSync的lock()方法。
final void lock() { // 利用cas置狀態位,若是成功,則表示佔有鎖成功 if (compareAndSetState(0, 1)) // 記錄當前線程爲鎖擁有者 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
在lock方法中,利用cas實現ReentrantLock的狀態置位(cas即compare and swap,它是CPU的指令,所以賦值操做都是原子性的)。若是成功,則表示佔有鎖成功,並記錄當前線程爲鎖擁有者。當佔有鎖失敗,則調用acquire(1)方法繼續處理。
public final void acquire(int arg) { //嘗試得到鎖,若是失敗,則加入到隊列中進行等待 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire()是AbstractQueuedSynchronizer的方法。它首先會調用tryAcquire()去嘗試得到鎖,若是得到鎖失敗,則將當前線程加入到CLH隊列中進行等待。tryAcquire()方法在NonfairSync中有實現,但最終調用的仍是Sync中的nonfairTryAcquire()方法。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 得到狀態 int c = getState(); // 若是狀態爲0,則表示該鎖未被其餘線程佔有 if (c == 0) { // 此時要再次利用cas去嘗試佔有鎖 if (compareAndSetState(0, acquires)) { // 標記當前線程爲鎖擁有者 setExclusiveOwnerThread(current); return true; } } // 若是當前線程已經佔有了,則state + 1,記錄佔有次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 此時無需利用cas去賦值,由於該鎖確定被當前線程佔有 setState(nextc); return true; } return false; }
在nonfairTryAcquire()中,首先會去得到鎖的狀態,若是爲0,則表示鎖未被其餘線程佔有,此時會利用cas去嘗試將鎖的狀態置位,並標記當前線程爲鎖擁有者;若是鎖的狀態大於0,則會判斷鎖是否被當前線程佔有,若是是,則state + 1,這也是爲何lock()的次數要和unlock()次數對等;若是佔有鎖失敗,則返回false。
在nonfairTryAcquire()返回false的狀況下,會繼續調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,將當前線程加入到隊列中繼續嘗試得到鎖。
private Node addWaiter(Node mode) { // 建立當前線程的節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 若是尾節點不爲空 if (pred != null) { // 則將當前線程的節點加入到尾節點以後,成爲新的尾節點 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { // CAS方法有可能失敗,所以要循環調用,直到當前線程的節點加入到隊列中 for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header,頭節點爲虛擬節點 h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter()是AbstactQueuedSynchronizer的方法,會以節點的形式來標記當前線程,並加入到尾節點中。enq()方法是在節點加入到尾節點失敗的狀況下,經過for(;;)循環反覆調用cas方法,直到節點加入成功。因爲enq()方法是非線程安全的,因此在增長節點的時候,須要使用cas設置head節點和tail節點。此時添加成功的結點狀態爲Node.EXCLUSIVE。
在節點加入到隊列成功以後,會接着調用acquireQueued()方法去嘗試得到鎖。
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { // 得到前一個節點 final Node p = node.predecessor(); // 若是前一個節點是頭結點,那麼直接去嘗試得到鎖 // 由於其餘線程有可能隨時會釋放鎖,不必Park等待 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
在acquireQueued()方法中,會利用for (;;)一直去得到鎖,若是前一個節點爲head節點,則表示能夠直接嘗試去得到鎖了,由於佔用鎖的線程隨時都有可能去釋放鎖而且該線程是被unpark喚醒的CLH隊列中的第一個節點,得到鎖成功後返回。
若是該線程的節點在CLH隊列中比較靠後或者得到鎖失敗,即其餘線程依然佔用着鎖,則會接着調用shouldParkAfterFailedAcquire()方法來阻塞當前線程,以讓出CPU資源。在阻塞線程以前,會執行一些額外的操做以提升CLH隊列的性能。因爲隊列中前面的節點有可能在等待過程當中被取消掉了,所以當前線程的節點須要提早,並將前一個節點置狀態位爲SIGNAL,表示能夠阻塞當前節點。所以該函數在判斷到前一個節點爲SIGNAL時,直接返回true便可。此處雖然存在對CLH隊列的同步操做,但因爲局部變量節點確定是不同的,因此對CLH隊列操做是線程安全的。因爲在compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行以前可能發生pred節點搶佔鎖成功或pred節點被取消掉,所以此處須要返回false以容許該節點能夠搶佔鎖。
當shouldParkAfterFailedAcquire()返回true時,會進入parkAndCheckInterrupt()方法。parkAndCheckInterrupt()方法最終調用safe.park()阻塞該線程,以避免該線程在等待過程當中無線循環消耗cpu資源。至此,當前線程便被park了。那麼線程什麼時候被unpark,這將在unlock()方法中進行。
這裏有一個小細節須要注意,在線程被喚醒以後,會調用Thread.interrupted()將線程中斷狀態置位爲false,而後記錄下中斷狀態並返回上層函數去拋出異常。我想這樣設計的目的是爲了可讓該線程能夠完成搶佔鎖的操做,從而可使當前節點稱爲CLH的虛擬頭節點。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park */ return true; if (ws > 0) { // 若是前面的節點是CANCELLED狀態,則一直提早 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
二、unlock()方法的實現
同lock()方法,unlock()方法依然調用的是sync.release(1)。
public final boolean release(int arg) { // 釋放鎖 if (tryRelease(arg)) { Node h = head; // 此處有個疑問,爲何須要判斷h.waitStatus != 0 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
能夠看到,tryRelease()方法實現了鎖的釋放,邏輯上便是將鎖的狀態置爲0。當釋放鎖成功以後,一般狀況下不須要喚醒隊列中線程,所以隊列中老是有一個線程處於活躍狀態。
總結:
ReentrantLock的鎖資源以state狀態描述,利用CAS則實現對鎖資源的搶佔,並經過一個CLH隊列阻塞全部競爭線程,在後續則逐個喚醒等待中的競爭線程。ReentrantLock繼承AQS徹底從代碼層面實現了java的同步機制,相對於synchronized,更容易實現對各種鎖的擴展。同時,AbstractQueuedSynchronizer中的Condition配合ReentrantLock使用,實現了wait/notify的功能。