深刻剖析Java重入鎖ReentrantLock的實現原理

🌹開源項目推薦🌹

Pepper Metrics是我與同事開發的一個開源工具(github.com/zrbcool/pep…),其經過收集jedis/mybatis/httpservlet/dubbo/motan的運行性能統計,並暴露成prometheus等主流時序數據庫兼容數據,經過grafana展現趨勢。其插件化的架構也很是方便使用者擴展並集成其餘開源組件。
請你們給個star,同時歡迎你們成爲開發者提交PR一塊兒完善項目。java


進入正題...

ReentrantLock,重入鎖,是JDK5中添加在併發包下的一個高性能的工具。顧名思義,ReentrantLock支持同一個線程在未釋放鎖的狀況下重複獲取鎖。node

每個東西的出現必定是有價值的。既然已經有了元老級的synchronized,並且synchronized也支持重入,爲何Doug Lea還要專門寫一個ReentrantLock呢?git

0 ReentrantLock與synchronized的比較

0.1 性能上的比較

首先,ReentrantLock的性能要優於synchronized。下面經過兩段代碼比價一下。 首先是synchronized:github

public class LockDemo2 {
    private static final Object lock = new Object(); // 定義鎖對象
    private static int count = 0; // 累加數
    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        CountDownLatch cdl = new CountDownLatch(100);
        // 啓動100個線程對count累加,每一個線程累加1000000次
        // 調用add函數累加,經過synchronized保證多線程之間的同步
        for (int i=0;i<100;i++) {
            new Thread(() -> {
                for (int i1 = 0; i1 <1000000; i1++) {
                    add();
                }
                cdl.countDown();
            }).start();
        }
        cdl.await();
        System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
    }

    private static void add() {
        synchronized (lock) {
            count++;
        }
    }
}
複製代碼

而後是ReentrantLock:數據庫

public class LockDemo3 {
    private static Lock lock = new ReentrantLock(); // 重入鎖
    private static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        CountDownLatch cdl = new CountDownLatch(100);
        for (int i=0;i<100;i++) {
            new Thread(() -> {
                for (int i1 = 0; i1 <1000000; i1++) {
                    add();
                }
                cdl.countDown();
            }).start();
        }
        cdl.await();
        System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
    }
    // 經過ReentrantLock保證線程之間的同步
    private static void add() {
        lock.lock();
        count++;
        lock.unlock();
    }
}
複製代碼

下面是運行屢次的結果對比:微信

synchronized ReentrantLock
第一次 4620 ms 3360 ms
第二次 4086 ms 3138 ms
第三次 4650 ms 3408 ms

整體來看,ReentrantLock的平均性能要比synchronized好20%左右。mybatis

PS:感謝 @荒野七叔 的指正。更嚴謹的描述一下這個性能的對比:當存在大量線程競爭鎖時,多數狀況下ReentrantLock的性能優於synchronized。多線程

由於在JDK6中對synchronized作了優化,在鎖競爭不激烈的時候,多數狀況下鎖會停留在偏向鎖和輕量級鎖階段,這兩個階段性能是很好的。當存在大量競爭時,可能會膨脹爲重量級鎖,性能降低,此時的ReentrantLock應該是優於synchronized的。架構

0.2 獲取鎖公平性的比較

公平性是啥概念呢?若是是公平的獲取鎖,就是說多個線程之間獲取鎖的時候要排隊,依次獲取鎖;若是是不公平的獲取鎖,就是說多個線程獲取鎖的時候一哄而上,誰搶到是誰的。併發

因爲synchronized是基於monitor機制實現的,它只支持非公平鎖;但ReentrantLock同時支持公平鎖和非公平鎖。

0.3 綜述

除了上文所述,ReentrantLock還有一些其餘synchronized不具有的特性,這裏來總結一下。

synchronized ReentrantLock
性能 相對較差 優於synchronized 20%左右
公平性 只支持非公平鎖 同時支持公平鎖與非公平鎖
嘗試獲取鎖的支持 不支持,一旦到了同步塊,且沒有獲取到鎖,就阻塞在這裏 支持,經過tryLock方法實現,可經過其返回值判斷是否成功獲取鎖,因此即便獲取鎖失敗也不會阻塞在這裏
超時的獲取鎖 不支持,若是一直獲取不到鎖,就會一直等待下去 支持,經過tryLock(time, TimeUnit)方法實現,若是超時了還沒獲取鎖,就放棄獲取鎖,不會一直阻塞下去
是否可響應中斷 不支持,不可響應線程的interrupt信號 支持,經過lockInterruptibly方法實現,經過此方法獲取鎖以後,線程可響應interrupt信號,並拋出InterruptedException異常
等待條件的支持 支持,經過wait、notify、notifyAll來實現 支持,經過Conditon接口實現,支持多個Condition,比synchronized更加靈活

1 可重入功能的實現原理

ReentrantLock的實現基於隊列同步器(AbstractQueuedSynchronizer,後面簡稱AQS),關於AQS的實現原理,能夠看筆者的另外一篇文章: Java隊列同步器(AQS)究竟是怎麼一回事

ReentrantLock的可重入功能基於AQS的同步狀態:state。

其原理大體爲:當某一線程獲取鎖後,將state值+1,並記錄下當前持有鎖的線程,再有線程來獲取鎖時,判斷這個線程與持有鎖的線程是不是同一個線程,若是是,將state值再+1,若是不是,阻塞線程。 當線程釋放鎖時,將state值-1,當state值減爲0時,表示當前線程完全釋放了鎖,而後將記錄當前持有鎖的線程的那個字段設置爲null,並喚醒其餘線程,使其從新競爭鎖。

// acquires的值是1
final boolean nonfairTryAcquire(int acquires) {
	// 獲取當前線程
	final Thread current = Thread.currentThread();
	// 獲取state的值
	int c = getState();
	// 若是state的值等於0,表示當前沒有線程持有鎖
	// 嘗試將state的值改成1,若是修改爲功,則成功獲取鎖,並設置當前線程爲持有鎖的線程,返回true
	if (c == 0) {
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	// state的值不等於0,表示已經有其餘線程持有鎖
	// 判斷當前線程是否等於持有鎖的線程,若是等於,將state的值+1,並設置到state上,獲取鎖成功,返回true
	// 若是不是當前線程,獲取鎖失敗,返回false
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}
複製代碼

2 非公平鎖的實現原理

ReentrantLock有兩個構造函數:

// 無參構造,默認使用非公平鎖(NonfairSync)
public ReentrantLock() {
	sync = new NonfairSync();
}

// 經過fair參數指定使用公平鎖(FairSync)仍是非公平鎖(NonfairSync)
public ReentrantLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼

sync是ReentrantLock的成員變量,是其內部類Sync的實例。NonfairSync和FairSync都是Sync類的子類。能夠參考以下類關係圖:

ReentrantLock類關係圖

Sync繼承了AQS,因此他具有了AQS的功能。一樣的,NonfairSync和FairSync都是AQS的子類。

當咱們經過無參構造函數獲取ReentrantLock實例後,默認用的就是非公平鎖。

下面將經過以下場景描述非公平鎖的實現原理:假設一個線程(t1)獲取到了鎖,其餘不少沒獲取到鎖的線程(others_t)加入到了AQS的同步隊列中等待,當這個線程執行完,釋放鎖後,其餘線程從新非公平的競爭鎖。

先來描述一下獲取鎖的方法:

final void lock() {
	// 線程t1成功的將state的值從0改成1,表示獲取鎖成功
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
	    // others_t線程們沒有獲取到鎖
		acquire(1);
}
複製代碼

若是獲取鎖失敗,會調用AQS的acquire方法

public final void acquire(int arg) {
    // tryAcquire是個模板方法,在NonfairSync中實現,若是在tryAcquire方法中依然獲取鎖失敗,會將當前線程加入同步隊列中等待(addWaiter)
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

tryAcquire的實現以下,實際上是調用了上面的nonfairTryAcquire方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
複製代碼

OK,此時t1獲取到了鎖,others_t線程們都跑到同步隊列裏等着了。

某一時刻,t1本身的任務執行完成,調用了釋放鎖的方法(unlock)。

public void unlock() {
    // 調用AQS的release方法釋放資源
    sync.release(1);
}
複製代碼
public final boolean release(int arg) {
    // tryRelease也是模板方法,在Sync中實現
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 成功釋放鎖後,喚醒同步隊列中的下一個節點,使之能夠從新競爭鎖
            // 注意此時不會喚醒隊列第一個節點以後的節點,這些節點此時仍是沒法競爭鎖
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼
protected final boolean tryRelease(int releases) {
    // 將state的值-1,若是-1以後等於0,釋放鎖成功
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
複製代碼

這時鎖被釋放了,被喚醒的線程和新來的線程從新競爭鎖(不包含同步隊列後面的那些線程)。

回到lock方法中,因爲此時全部線程都能經過CAS來獲取鎖,並不能保證被喚醒的那個線程能競爭過新來的線程,因此是非公平的。這就是非公平鎖的實現。

這個過程大概能夠描述爲下圖這樣子:

非公平鎖的競爭

3 公平鎖的實現原理

公平鎖與非公平鎖的釋放鎖的邏輯是同樣的,都是調用上述的unlock方法,最大區別在於獲取鎖的時候。

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    // 獲取鎖,與非公平鎖的不一樣的地方在於,這裏直接調用的AQS的acquire方法,沒有先嚐試獲取鎖
    // acquire又調用了下面的tryAcquire方法,核心在於這個方法
    final void lock() {
        acquire(1);
    }

    /** * 這個方法和nonfairTryAcquire方法只有一點不一樣,在標註爲#1的地方 * 多了一個判斷hasQueuedPredecessors,這個方法是判斷當前AQS的同步隊列中是否還有等待的線程 * 若是有,返回true,不然返回false。 * 由此可知,當隊列中沒有等待的線程時,當前線程才能嘗試經過CAS的方式獲取鎖。 * 不然就讓這個線程去隊列後面排隊。 */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // #1
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
複製代碼

經過註釋可知,在公平鎖的機制下,任何線程想要獲取鎖,都要排隊,不可能出現插隊的狀況。這就是公平鎖的實現原理。

這個過程大概能夠描述爲下圖這樣子:

公平鎖的競爭

4 tryLock原理

tryLock作的事情很簡單:讓當前線程嘗試獲取一次鎖,成功的話返回true,不然false。

其實現,其實就是調用了nonfairTryAcquire方法來獲取鎖。

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}
複製代碼

至於獲取失敗的話,他也不會將本身添加到同步隊列中等待,直接返回false,讓業務調用代碼本身處理。

5 可中斷的獲取鎖

中斷,也就是經過Thread的interrupt方法將某個線程中斷,中斷一個阻塞狀態的線程,會拋出一個InterruptedException異常。

若是獲取鎖是可中斷的,當一個線程長時間獲取不到鎖時,咱們能夠主動將其中斷,可避免死鎖的產生。

其實現方式以下:

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
複製代碼

會調用AQS的acquireInterruptibly方法

public final void acquireInterruptibly(int arg) throws InterruptedException {
    // 判斷當前線程是否已經中斷,若是已中斷,拋出InterruptedException異常
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
複製代碼

此時會優先經過tryAcquire嘗試獲取鎖,若是獲取失敗,會將本身加入到隊列中等待,並可隨時響應中斷。

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 將本身添加到隊列中等待
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋的獲取鎖
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 獲取鎖失敗,在parkAndCheckInterrupt方法中,經過LockSupport.park()阻塞當前線程,
            // 並調用Thread.interrupted()判斷當前線程是否已經被中斷
            // 若是被中斷,直接拋出InterruptedException異常,退出鎖的競爭隊列
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // #1
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

PS:不可中斷的方式下,代碼#1位置不會拋出InterruptedException異常,只是簡單的記錄一下當前線程被中斷了。

6 可超時的獲取鎖

經過以下方法實現,timeout是超時時間,unit表明時間的單位(毫秒、秒...)

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
複製代碼

能夠發現,這也是一個能夠響應中斷的方法。而後調用AQS的tryAcquireNanos方法:

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
複製代碼

doAcquireNanos方法與中斷裏面的方法大同小異,下面在註釋中說明一下不一樣的地方:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 計算超時截止時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 計算到截止時間的剩餘時間
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) // 超時了,獲取失敗
                return false;
            // 超時時間大於1000納秒時,才阻塞
            // 由於若是小於1000納秒,基本能夠認爲超時了(系統調用的時間可能都比這個長)
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 響應中斷
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

7 總結

本文首先對比了元老級的鎖synchronized與ReentrantLock的不一樣,ReentrantLock具備一下優點:

  • 同時支持公平鎖與非公平鎖
  • 支持:嘗試非阻塞的一次性獲取鎖
  • 支持超時獲取鎖
  • 支持可中斷的獲取鎖
  • 支持更多的等待條件(Condition)

而後介紹了幾個主要特性的實現原理,這些都是基於AQS的。

ReentrantLock的核心,是經過修改AQS中state的值來同步鎖的狀態。 經過這個方式,實現了可重入。

ReentrantLock具有公平鎖和非公平鎖,默認使用非公平鎖。其實現原理主要依賴於AQS中的同步隊列。

最後,可中斷的機制是內部經過Thread.interrupted()判斷當前線程是否已被中斷,若是被中斷就拋出InterruptedException異常來實現的。


歡迎關注個人微信公衆號

公衆號
相關文章
相關標籤/搜索