前幾篇文章介紹了 AQS(AbstractQueuedSynchronizer)中的獨佔模式和對 Condition 的實現,這一篇文章來聊聊基於 AQS 框架實現的鎖工具:ReentrantLock。java
ReentrantLock 是一個可重入的互斥鎖,也被稱爲獨佔鎖。node
ReentrantLock 具備兩種實現:公平鎖(fair lock)、非公平鎖(non-fair lock)。c#
本文基於 jdk1.8.0_91
public class ReentrantLock implements Lock, java.io.Serializable
ReentrantLock 是 Lock 接口的實現,方法對照以下:segmentfault
Lock 接口 ReentrantLock 實現 lock() sync.lock() lockInterruptibly() sync.acquireInterruptibly(1) tryLock() sync.nonfairTryAcquire(1) tryLock(long time, TimeUnit unit) sync.tryAcquireNanos(1, unit.toNanos(timeout)) unlock() sync.release(1) newCondition() sync.newCondition()
可知 ReentrantLock 對 Lock 的實現都是調用內部類 Sync 來作的。框架
Sync 繼承了 AQS,也就是說 ReentrantLock 的大部分實現都已經由 AQS 完成了。less
ReentrantLock 使用 AQS 中的 state 表示鎖是否被其餘線程鎖佔用。若是爲 0 則表示未被佔用,其餘值表示該鎖被重入的次數。對於獲取鎖失敗的線程,要麼在同步隊列中等待(Lock#lock),要麼直接返回(Lock#tryLock)。工具
此外,AQS 提供了一系列模板方法,對於互斥鎖來講,須要實現其中的 tryAcquire、tryRelease、isHeldExclusively 方法。ui
// 獨佔獲取(資源數) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // 獨佔釋放(資源數) protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // 共享獲取(資源數) protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // 共享獲取(資源數) protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 是否排它狀態 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
Sync 重寫了 AQS 的 tryRelease 和 isHeldExclusively 方法,而 AQS 的 tryAcquire 方法交由 Sync 的子類來實現。
Sync 中還具有了一個抽象的 lock 方法,強制子類實現。spa
abstract static class Sync extends AbstractQueuedSynchronizer { /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); protected final boolean tryRelease(int releases) {...} protected final boolean isHeldExclusively() {...} }
Sync 具備兩個子類 FailSync 和 NonfairSync,對應的是 ReentrantLock 的兩種實現:公平鎖(fair lock)、非公平鎖(non-fair lock)。
Sync 中包含了一個抽象方法 lock 須要子類來實現,設計該抽象方法的目的是,給非公平模式加鎖提供入口。
/** Synchronizer providing all implementation mechanics */ // 提供全部實現機制的同步器 private final Sync sync; /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
由於公平模式下,隊列頭部的線程從阻塞中被喚醒到真正運行,涉及到線程調度和 CPU 上下文的切換,比較耗時,這個過程當中鎖處於空閒狀態,浪費資源。
而非公平模式下,多個線程之間採用 CAS 來爭奪鎖,這中間沒有時間延遲,可以提升吞吐量。
// 獲取鎖。 void lock() // 若是當前線程未被中斷,則獲取鎖。 void lockInterruptibly() // 僅在調用時鎖未被另外一個線程保持的狀況下,才獲取該鎖。 boolean tryLock() // 若是鎖在給定等待時間內沒有被另外一個線程保持,且當前線程未被中斷,則獲取該鎖。 boolean tryLock(long timeout, TimeUnit unit) // 試圖釋放此鎖。 void unlock() // 返回用來與此 Lock 實例一塊兒使用的 Condition 實例。 Condition newCondition()
/** * Acquires the lock. * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * * <p>If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); }
在 ReentrantLock 中,Sync 的兩個子類 FairSync、NonfairSync 各自實現了 Sync#lock 方法。
final void lock() { acquire(1); }
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
公平鎖的 lock 方法直接調用 AQS#acquire(1);
非公平鎖首先經過 CAS 修改 state 值來獲取鎖,當獲取失敗時纔會調用 AQS#acquire(1) 來獲取鎖。
AQS 中的 acquire 方法中,只有 tryAcquire 方法須要子類來實現。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
其中 tryRelease 方法嘗試釋放鎖,釋放鎖成功會喚醒在同步隊列中等待的線程,詳細內容見AQS 獨佔模式實現原理,這裏只介紹 ReentrantLock 對 tryRelease 的實現。
在 ReentrantLock 中,Sync 的兩個子類 FairSync、NonfairSync 各自實現了 AQS#tryAcquire 方法。
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && // 判斷同步隊列中是否有等待時間更長的節點 compareAndSetState(0, acquires)) { // 進入這裏,說明當前線程等待鎖時間最長,則CAS修改state setExclusiveOwnerThread(current); // 將當前線程設置爲持有鎖的線程 return true; } } else if (current == getExclusiveOwnerThread()) { // 當前線程已持有鎖 int nextc = c + acquires; // 重入次數 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 可重入的最大次數 Integer.MAX_VALUE setState(nextc); return true; } return false; }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 無論當前線程在同步隊列中是否等待最久,都來 CAS 爭奪鎖 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 若是已得到鎖,則重入。邏輯與公平鎖一致 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
能夠看到,公平鎖比非公平鎖多執行了 hasQueuedPredecessors 方法,用於判斷同步隊列中是否具備比當前線程等待鎖時間還長的節點。
在 AQS 中的同步隊列中,頭節點是一個 dummy node,所以等待時間最長的節點是頭節點的下一個節點,若該節點存在且不是當前線程,則 hasQueuedPredecessors 返回 true。說明當前節點不是同步隊列中等待時間最長的節點,沒法獲取鎖。
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && // 頭節點的下一個節點,不是當前線程的節點,說明當前線程等待鎖時間不是最長的 ((s = h.next) == null || s.thread != Thread.currentThread()); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 獲取鎖以前,當前線程已經中斷,則直接拋出中斷異常 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 線程在阻塞等待鎖的過程當中,被中斷喚醒,則放棄等待鎖,直接拋出異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
與非公平鎖的 NonfairSync#tryAcquire 方法同樣,都是調用了 Sync#nonfairTryAcquire 方法。
注意,Sync#nonfairTryAcquire 方法不存在任何和隊列相關的操做。
在鎖沒有被其餘線程持有的狀況下,無論使用的是公平鎖仍是非公平鎖,多個線程均可以調用該方法來 CAS 爭奪鎖,爭奪失敗了不用進入同步隊列,直接返回。
使用公平鎖的狀況下,在上一個持有鎖的線程釋放鎖(鎖狀態 state == 0)以後,同步隊列中的線程被喚醒但還沒有獲取鎖以前,此時當前線程執行 tryLock 可能會成功得到鎖,這其實是對公平鎖的公平性的一種破壞。
在某些狀況下,打破公平性的行爲多是有用的。若是但願遵照此鎖的公平設置,則使用 tryLock(0, TimeUnit.SECONDS)
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 無論同步隊列中是否具備等待好久的節點,當前線程直接 CAS 爭奪鎖 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 若是已得到鎖,則重入。邏輯與公平鎖一致 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
在 JDK 中,AQS 中的 tryAcquireNanos 方法只會被 ReentrantLock#tryLock(time, unit) 和 ReentrantReadWriteLock.WriteLock#tryLock(time, unit) 這兩個方法調用到。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
/** * Acquires in exclusive timed mode. * * @param arg the acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 直到超時都沒有得到鎖,則返回false return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 能夠進入阻塞的狀況下,剩餘時間大於閾值,則阻塞,不然自旋 if (Thread.interrupted()) throw new InterruptedException(); // 線程在阻塞等待鎖的過程當中,被中斷喚醒,則放棄等待鎖,直接拋出異常 } } finally { if (failed) cancelAcquire(node); } }
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { // 釋放鎖資源 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 喚醒head的後繼節點 return true; } return false; }
其中 tryRelease 方法嘗試釋放鎖,釋放鎖成功會喚醒在同步隊列中等待的線程,詳細內容見AQS 獨佔模式實現原理,這裏只介紹 ReentrantLock 對 tryRelease 的實現。
在 ReentrantLock 中,不論是公平鎖仍是非公平鎖,均使用相同的 Sync#tryRelease 方法。
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 計算釋放以後的state if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 鎖已所有釋放(獲取鎖的次數必須等於釋放次數),返回true,代表能夠喚醒下一個等待線程 setExclusiveOwnerThread(null); // 設置獨佔鎖持有線程爲null } setState(c); return free; }
建立 AQS 中的 ConditionObject 對象。是能夠與 Lock 一塊兒使用的 Condition 接口實例。
final ConditionObject newCondition() { return new ConditionObject(); }
public ConditionObject() { }
Condition 實例支持與 Object 的監視器方法(wait、notify 和 notifyAll)相同的用法(await、signal 和 signalAll)。
