深刻剖析Java重入鎖ReentrantLock的實現原理

ReentrantLock,重入鎖,是JDK5中添加在併發包下的一個高性能的工具。顧名思義,ReentrantLock支持同一個線程在未釋放鎖的狀況下重複獲取鎖。node

每個東西的出現必定是有價值的。既然已經有了元老級的synchronized,並且synchronized也支持重入,爲何Doug Lea還要專門寫一個ReentrantLock呢?多線程

0 ReentrantLock與synchronized的比較併發

0.1 性能上的比較函數

首先,ReentrantLock的性能要優於synchronized。下面經過兩段代碼比價一下。 首先是synchronized:工具

public class LockDemo2 { private static final Object lock = new Object(); // 定義鎖對象 private static int count = 0; // 累加數 public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); CountDownLatch cdl = new CountDownLatch(100); // 啓動100個線程對count累加,每一個線程累加1000000次 // 調用add函數累加,經過synchronized保證多線程之間的同步 for (int i=0;i<100;i++) { new Thread(() -> { for (int i1 = 0; i1 <1000000; i1++) { add(); } cdl.countDown(); }).start(); } cdl.await(); System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count); } private static void add() { synchronized (lock) { count++; } } } 複製代碼 而後是ReentrantLock:性能

public class LockDemo3 { private static Lock lock = new ReentrantLock(); // 重入鎖 private static int count = 0; public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); CountDownLatch cdl = new CountDownLatch(100); for (int i=0;i<100;i++) { new Thread(() -> { for (int i1 = 0; i1 <1000000; i1++) { add(); } cdl.countDown(); }).start(); } cdl.await(); System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count); } // 經過ReentrantLock保證線程之間的同步 private static void add() { lock.lock(); count++; lock.unlock(); } } 下面是運行屢次的結果對比:ui

整體來看,ReentrantLock的平均性能要比synchronized好20%左右。

0.2 獲取鎖公平性的比較 公平性是啥概念呢?若是是公平的獲取鎖,就是說多個線程之間獲取鎖的時候要排隊,依次獲取鎖;若是是不公平的獲取鎖,就是說多個線程獲取鎖的時候一哄而上,誰搶到是誰的。this

因爲synchronized是基於monitor機制實現的,它只支持非公平鎖;但ReentrantLock同時支持公平鎖和非公平鎖。線程

0.3 綜述3d

除了上文所述,ReentrantLock還有一些其餘synchronized不具有的特性,這裏來總結一下。

1 可重入功能的實現原理 ReentrantLock的實現基於隊列同步器(AbstractQueuedSynchronizer,後面簡稱AQS),關於AQS的實現原理,能夠看筆者的另外一篇文章: Java隊列同步器(AQS)究竟是怎麼一回事

ReentrantLock的可重入功能基於AQS的同步狀態:state。

其原理大體爲:當某一線程獲取鎖後,將state值+1,並記錄下當前持有鎖的線程,再有線程來獲取鎖時,判斷這個線程與持有鎖的線程是不是同一個線程,若是是,將state值再+1,若是不是,阻塞線程。 當線程釋放鎖時,將state值-1,當state值減爲0時,表示當前線程完全釋放了鎖,而後將記錄當前持有鎖的線程的那個字段設置爲null,並喚醒其餘線程,使其從新競爭鎖。

// acquires的值是1 final boolean nonfairTryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取state的值 int c = getState(); // 若是state的值等於0,表示當前沒有線程持有鎖 // 嘗試將state的值改成1,若是修改爲功,則成功獲取鎖,並設置當前線程爲持有鎖的線程,返回true if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // state的值不等於0,表示已經有其餘線程持有鎖 // 判斷當前線程是否等於持有鎖的線程,若是等於,將state的值+1,並設置到state上,獲取鎖成功,返回true // 若是不是當前線程,獲取鎖失敗,返回false else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 2 非公平鎖的實現原理

ReentrantLock有兩個構造函數:

// 無參構造,默認使用非公平鎖(NonfairSync) public ReentrantLock() { sync = new NonfairSync(); } // 經過fair參數指定使用公平鎖(FairSync)仍是非公平鎖(NonfairSync) public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } sync是ReentrantLock的成員變量,是其內部類Sync的實例。NonfairSync和FairSync都是Sync類的子類。能夠參考以下類關係圖:

Sync繼承了AQS,因此他具有了AQS的功能。一樣的,NonfairSync和FairSync都是AQS的子類。

當咱們經過無參構造函數獲取ReentrantLock實例後,默認用的就是非公平鎖。

下面將經過以下場景描述非公平鎖的實現原理:假設一個線程(t1)獲取到了鎖,其餘不少沒獲取到鎖的線程(others_t)加入到了AQS的同步隊列中等待,當這個線程執行完,釋放鎖後,其餘線程從新非公平的競爭鎖。

先來描述一下獲取鎖的方法:

final void lock() { // 線程t1成功的將state的值從0改成1,表示獲取鎖成功 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // others_t線程們沒有獲取到鎖 acquire(1); } 若是獲取鎖失敗,會調用AQS的acquire方法

public final void acquire(int arg) { // tryAcquire是個模板方法,在NonfairSync中實現,若是在tryAcquire方法中依然獲取鎖失敗,會將當前線程加入同步隊列中等待(addWaiter) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } tryAcquire的實現以下,實際上是調用了上面的nonfairTryAcquire方法

protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } OK,此時t1獲取到了鎖,others_t線程們都跑到同步隊列裏等着了。

某一時刻,t1本身的任務執行完成,調用了釋放鎖的方法(unlock)。

public void unlock() { // 調用AQS的release方法釋放資源 sync.release(1); } public final boolean release(int arg) { // tryRelease也是模板方法,在Sync中實現 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 成功釋放鎖後,喚醒同步隊列中的下一個節點,使之能夠從新競爭鎖 // 注意此時不會喚醒隊列第一個節點以後的節點,這些節點此時仍是沒法競爭鎖 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // 將state的值-1,若是-1以後等於0,釋放鎖成功 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 這時鎖被釋放了,被喚醒的線程和新來的線程從新競爭鎖(不包含同步隊列後面的那些線程)。

回到lock方法中,因爲此時全部線程都能經過CAS來獲取鎖,並不能保證被喚醒的那個線程能競爭過新來的線程,因此是非公平的。這就是非公平鎖的實現。

這個過程大概能夠描述爲下圖這樣子:

3 公平鎖的實現原理

公平鎖與非公平鎖的釋放鎖的邏輯是同樣的,都是調用上述的unlock方法,最大區別在於獲取鎖的時候。

static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 獲取鎖,與非公平鎖的不一樣的地方在於,這裏直接調用的AQS的acquire方法,沒有先嚐試獲取鎖 // acquire又調用了下面的tryAcquire方法,核心在於這個方法 final void lock() { acquire(1); } /**

  • 這個方法和nonfairTryAcquire方法只有一點不一樣,在標註爲#1的地方
  • 多了一個判斷hasQueuedPredecessors,這個方法是判斷當前AQS的同步隊列中是否還有等待的線程
  • 若是有,返回true,不然返回false。
  • 由此可知,當隊列中沒有等待的線程時,當前線程才能嘗試經過CAS的方式獲取鎖。
  • 不然就讓這個線程去隊列後面排隊。 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // #1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } 經過註釋可知,在公平鎖的機制下,任何線程想要獲取鎖,都要排隊,不可能出現插隊的狀況。這就是公平鎖的實現原理。

這個過程大概能夠描述爲下圖這樣子:

4 tryLock原理

tryLock作的事情很簡單:讓當前線程嘗試獲取一次鎖,成功的話返回true,不然false。

其實現,其實就是調用了nonfairTryAcquire方法來獲取鎖。

public boolean tryLock() { return sync.nonfairTryAcquire(1); } 至於獲取失敗的話,他也不會將本身添加到同步隊列中等待,直接返回false,讓業務調用代碼本身處理。

5 可中斷的獲取鎖

中斷,也就是經過Thread的interrupt方法將某個線程中斷,中斷一個阻塞狀態的線程,會拋出一個InterruptedException異常。

若是獲取鎖是可中斷的,當一個線程長時間獲取不到鎖時,咱們能夠主動將其中斷,可避免死鎖的產生。

其實現方式以下:

public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } 複製代碼 會調用AQS的acquireInterruptibly方法

public final void acquireInterruptibly(int arg) throws InterruptedException { // 判斷當前線程是否已經中斷,若是已中斷,拋出InterruptedException異常 if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } 此時會優先經過tryAcquire嘗試獲取鎖,若是獲取失敗,會將本身加入到隊列中等待,並可隨時響應中斷。

private void doAcquireInterruptibly(int arg) throws InterruptedException { // 將本身添加到隊列中等待 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋的獲取鎖 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 獲取鎖失敗,在parkAndCheckInterrupt方法中,經過LockSupport.park()阻塞當前線程, // 並調用Thread.interrupted()判斷當前線程是否已經被中斷 // 若是被中斷,直接拋出InterruptedException異常,退出鎖的競爭隊列 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // #1 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } PS:不可中斷的方式下,代碼#1位置不會拋出InterruptedException異常,只是簡單的記錄一下當前線程被中斷了。

6 可超時的獲取鎖

經過以下方法實現,timeout是超時時間,unit表明時間的單位(毫秒、秒...)

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } 能夠發現,這也是一個能夠響應中斷的方法。而後調用AQS的tryAcquireNanos方法:

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } doAcquireNanos方法與中斷裏面的方法大同小異,下面在註釋中說明一下不一樣的地方:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 計算超時截止時間 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 計算到截止時間的剩餘時間 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 超時了,獲取失敗 return false; // 超時時間大於1000納秒時,才阻塞 // 由於若是小於1000納秒,基本能夠認爲超時了(系統調用的時間可能都比這個長) if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 響應中斷 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 7 總結

本文首先對比了元老級的鎖synchronized與ReentrantLock的不一樣,ReentrantLock具備一下優點:

同時支持公平鎖與非公平鎖 支持:嘗試非阻塞的一次性獲取鎖 支持超時獲取鎖 支持可中斷的獲取鎖 支持更多的等待條件(Condition) 而後介紹了幾個主要特性的實現原理,這些都是基於AQS的。

ReentrantLock的核心,是經過修改AQS中state的值來同步鎖的狀態。 經過這個方式,實現了可重入。

ReentrantLock具有公平鎖和非公平鎖,默認使用非公平鎖。其實現原理主要依賴於AQS中的同步隊列。

最後,可中斷的機制是內部經過Thread.interrupted()判斷當前線程是否已被中斷,若是被中斷就拋出InterruptedException異常來實現的。

相關文章
相關標籤/搜索