以前在介紹synchronized時說過,要實現鎖只須要添加個關鍵字便可,對於程序員來講,它的內部實現是透明的,這種方式叫隱式鎖。java
而從java1.5起,併發大神Doug Lea開拓性地引入了一種程序員能夠顯示地操做鎖的接口-Lock以及它的實現類,因此以Lock接口爲表明的鎖能夠看作是顯示鎖,而Lock最經常使用的實現則是ReentrantLock了,本文也主要圍繞ReentrantLock的原理來揭開顯示鎖神祕的面紗~node
開始以前,要先介紹一下ReentrantLock這座大廈的基礎:Lock、AQS、LockSupport。程序員
Lock是一個接口,主要提供瞭如下幾個方法:安全
//獲取鎖,獲取不到時阻塞,直到h獲取成功
void lock();
//釋放鎖
void unlock();
//嘗試獲取鎖,獲取成功返回ture,失敗則返回false,不阻塞
boolean tryLock();
//嘗試獲取鎖,能夠指定嘗試時間,若指定時間內還沒獲取到則返回false,獲取到返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedEx //建立一個顯示條件 ception;Condition newCondition();
複製代碼
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 序列串
private static final long serialVersionUID = 7373984972572414691L;
// 默認構造
protected AbstractQueuedSynchronizer() { }
// 等待鎖隊列頭結點
private transient volatile Node head;
// 等待鎖隊列尾節點
private transient volatile Node tail;
// 同步狀態 0-表示未加鎖 其餘非0值N表示已鎖且N爲重入次數
private volatile int state;
// 內部節點類-等待鎖隊列的節點
static final class Node {
/* 兩種鎖的模式 */
// 共享鎖模式
static final Node SHARED = new Node();
// 獨佔鎖模式
static final Node EXCLUSIVE = null;
/* 等待狀態-值範圍 */
// 表示當前線程取消獲取鎖
static final int CANCELLED = 1;
// 表示隊列中的後繼線程須要被喚醒
static final int SIGNAL = -1;
// 表示線程正在等待條件
static final int CONDITION = -2;
// 表示下一個共享請求應無條件傳播
static final int PROPAGATE = -3;
// 等着狀態本尊
volatile int waitStatus;
// 前驅節點
volatile Node prev;
// 後繼節點
volatile Node next;
// 等待獲取鎖的線程
volatile Thread thread;
// 下一個等待者
Node nextWaiter;
// 判斷是不是共享鎖模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// Used by addWaiter 經過addWaiter建立
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// Used by Condition 經過條件隊列建立
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}
複製代碼
AQS全稱AbstractQueuedSynchronizer,顧名思義也就是「抽象隊列同步器」,它不是一個能夠直接拿來使用的類,而是將鎖的公共操做功能提取成一個抽象類,供鎖的具體實現類使用,好比後面會講到的ReentrantLock就是基於AQS實現鎖的操做的。AQS內部使用多個Node鏈接成的隊列來實現鎖等待線程的管理,當鎖釋放時,會喚醒head的下一個線程來競爭鎖。內部隊列圖以下:微信
LockSupport經常使用於實現線程的阻塞,相似於wait()和notify()的組合,可是與它們不同的是LockSupport不須要依賴於Synchronized。併發
public class LockSupport {
// 私有構造方法
private LockSupport() {} // Cannot be instantiated.
// 阻塞當前線程,若是掉用unpark(Thread)方法或被中斷,才能從park()返回
public static void park() {
UNSAFE.park(false, 0L);
}
// 阻塞當前線程,超時後返回,阻塞時間最長不超過nanos納秒
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
// 阻塞當前線程,直到指定時間點返回
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}
// 喚醒指定線程
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
// 設置阻塞對象
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
// 阻塞當前線程,並指定要求阻塞的發起者
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 阻塞當前線程,並指定要求阻塞的發起者,超時後返回,阻塞時間最長不超過nanos納秒
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
// 阻塞當前線程,並指定要求阻塞的發起者,直到deadline時間點
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
// 獲取阻塞對象
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}
}
複製代碼
從LockSupport源碼中能夠看出,它內部實現線程阻塞都是經過Unsafe類中的native方法來完成的,Unsafe類簡單說就是提供了相似C/C++直接修改內存的操做,在java中認爲是不安全的,因此通常要通過安全授信才能使用。它內部的大部分代碼最終實現是其餘語言實現的,因此在java中不能直接看到實現,不過能夠經過openjdk的源碼查看,具體就不詳述了。高併發
主要介紹下LockSuppport中的幾個經常使用方法:性能
方法 | 說明 |
---|---|
park() | 阻塞當前線程,直到對該線程執行unpark方法喚醒或線程中斷返回 |
parkNanos(long nanos) | 阻塞當前線程,容許指定阻塞時間,超時後返回,單位是納秒 |
parkUntil(long deadline) | 阻塞當前線程,容許在指定時間到以前返回 |
unpark(Thread thread) | 喚醒指定線程 |
public class ReentrantLock implements Lock, java.io.Serializable {
// 序列號
private static final long serialVersionUID = 7373984872572414699L;
// 內部同步類Syn的一個屬性,供public方法中調用AQS方法
private final Sync sync;
// 內部類Syn 繼承 AQS,供內部的公平和非公平兩種鎖類繼承
abstract static class Sync extends AbstractQueuedSynchronizer {}
// 非公平鎖類
static final class NonfairSync extends Sync {}
// 公平鎖類
static final class FairSync extends Sync {}
// 默認構造方法,默認是非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
// 用於指定鎖方式的構造方法
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 普通加鎖方法,會阻塞直到成功
public void lock() // 普通加鎖方法,與lock不一樣的是它支持響應中斷 public void lockInterruptibly() // 嘗試獲取鎖方法,無論成功仍是失敗都是當即返回結果,不阻塞 public boolean tryLock() // 嘗試獲取鎖方法,成功當即返回,失敗則阻塞必定時間,超時未成功則結束阻塞返回異常。同時支持響應中斷請求 public boolean tryLock(long timeout, TimeUnit unit) // 普通解鎖方法 public void unlock() // 新建條件 public Condition newCondition() public int getHoldCount() public boolean isHeldByCurrentThread() public boolean isLocked() public final boolean isFair() protected Thread getOwner() public final boolean hasQueuedThreads() public final boolean hasQueuedThread(Thread thread) public final int getQueueLength() protected Collection<Thread> getQueuedThreads() public boolean hasWaiters(Condition condition) public int getWaitQueueLength(Condition condition) protected Collection<Thread> getWaitingThreads(Condition condition) } 複製代碼
ReentrantLock內部構造: ui
簡單說明下:this
該方法用於直接加鎖,而且會一直阻塞到加鎖成功(或者線程中斷),先簡單總結下它的流程:
接下來看下它的源碼實現:
public void lock() {
sync.lock();
}
複製代碼
實際調用那種lock方式取決於當前的ReentrantLock是公平鎖仍是非公平鎖,這裏以非公平鎖的方式進行解析。
// 靜態類NonfairSync 內部方法
final void lock() {
// 原子性校驗並設置鎖
if (compareAndSetState(0, 1))
// 若是獲取到鎖則更設置當前線程爲獨佔鎖的線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 若是獲取鎖失敗則繼續嘗試
acquire(1);
}
複製代碼
acquire()是AQS中的方法,用於阻塞獲取鎖。
public final void acquire(int arg) {
// 嘗試一次獲取鎖,成功直接返回,失敗則繼續自旋幾遍鎖後再視狀況阻塞
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 若是獲取鎖失敗且阻塞獲取鎖的時候收到了中斷請求則執行中斷線程操做
selfInterrupt();
}
複製代碼
tryAcquire()方法在AQS中定義可是沒有實現,由它的子類來重寫。此處是要執行的是NofairSyn中的重寫方法。
// AQS中定義
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// NofairSyn中的實現
protected final boolean tryAcquire(int acquires) {
// 實際調用的是Syn中的方法
return nonfairTryAcquire(acquires);
}
// Syn中真實實現tryAcquire的方法
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前執行的線程對象
final Thread current = Thread.currentThread();
// 獲取當前的同步狀態
int c = getState();
// 同步狀態爲無狀態時,說明此時鎖是空的
if (c == 0) {
// 嘗試獲取鎖
if (compareAndSetState(0, acquires)) {
// 獲取鎖成功後設置當前線程爲獨佔鎖線程
setExclusiveOwnerThread(current);
return true;
}
}
// 當已經被鎖時,校驗當前獲取鎖的線程是否是本身(是本身則可重入)
else if (current == getExclusiveOwnerThread()) {
// 從新獲取鎖次數累加
int nextc = c + acquires;
// 若是超出最大鎖定次數則拋出異常
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 設置鎖狀態爲重入次數
setState(nextc);
return true;
}
// 不然獲取鎖失敗直接返回false
return false;
}
複製代碼
addWaiter()方法用於爲當前線程和給定模式建立隊列節點。
private Node addWaiter(Node mode) {
// 建立指定模式的節點,有兩種模式:共享和獨佔
Node node = new Node(Thread.currentThread(), mode);
// 首先嚐試在隊列尾部加入
Node pred = tail;
// 隊列不爲空
if (pred != null) {
// 在尾部加入隊列
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若是失敗則執行插入方式
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 隊列爲空則執行初始化操做,新建一個節點做爲頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 不然直接在尾部插入
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
acquireQueued()方法用於自旋檢查節點是否在head節點,是則嘗試獲取鎖。不然繼續自旋,可是不是一直無休止的自旋下去(太消耗性能了),而是在每次自旋後都會檢查是否須要繼續自旋,不須要則經過LockSupport.park()釋放CPU,進入休眠等待喚醒狀態。
final boolean acquireQueued(final Node node, int arg) {
// 默認會執行失敗,以便在最後執行失敗處理
boolean failed = true;
try {
// 默認中斷標誌爲false-不中斷
boolean interrupted = false;
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 若是前驅節點是head節點 而且 嘗試獲取鎖成功
if (p == head && tryAcquire(arg)) {
// 將本節點設置爲head節點
setHead(node);
// 將此前的head節點的後繼置爲空,減小引用,幫助GC回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// 每次獲取鎖失敗都檢查下是否須要將本線程休眠
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 休眠結束後若是收到了中斷請求,則返回中斷標誌
interrupted = true;
}
} finally {
// 若是失敗則執行失敗處理,主要包括更新節點的waitStatus爲Canceled,將將其中等待隊列中移除
if (failed)
cancelAcquire(node);
}
}
複製代碼
shouldParkAfterFailedAcquire()方法用於檢查當前線程是否須要中止自旋,轉爲阻塞休眠。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅節點的等待狀態
int ws = pred.waitStatus;
// 若是前驅節點狀態爲SIGNAL,說明前置Node的線程已經阻塞休眠了,此時後面的線程也都須要休眠了
if (ws == Node.SIGNAL)
return true;
// ws大於0說明前置節點已經取消或者中斷了,此時只是將前置節點移除隊列
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// ws < 0說明此時前置節點也正在自旋中,故再也不繼續自旋,將本身的狀態改成SIGNAL,以便讓後面的節點都直接進入阻塞睡眠中
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
parkAndCheckInterrupt()方法用於執行線程休眠並判斷中斷標誌的:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
cancelAcquire()方法用於取消請求獲取鎖的一些操做:
private void cancelAcquire(Node node) {
// 若是節點不存在直接返回
if (node == null)
return;
// 置節點線程爲空
node.thread = null;
// 跳過已經取消的前驅,將node的前驅指向前面第一個不爲空的節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 備份前驅的後繼
Node predNext = pred.next;
// 直接更新waitStatus爲Canceled
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 若是當前節點是尾節點 則設置當前節點的前驅節點爲尾節點
if (node == tail && compareAndSetTail(node, pred)) {
// 因爲pred變成了尾節點,故要將它的後繼置爲null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 前驅不爲head
// 前驅的狀態爲SIGNAL 或 將前驅的狀態更新爲SIGNAL成功
// 前驅的線程不爲空
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
// 若是後繼不爲空,且未取消則更新前驅的後繼 爲 node的後繼
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 喚醒下一個不爲空的節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
複製代碼
unparkSuccessor()方法用於喚醒隊列中下一個不爲空的節點線程
private void unparkSuccessor(Node node) {
// 嘗試將node的等待狀態置爲0,這樣的話,後繼爭用線程能夠有機會再嘗試獲取一次鎖。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 喚醒不爲空的後繼節點,若是爲空,則總尾部開始向前遍歷到首部第一個不爲空的節點喚醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 若是存在未取消的等待節點則執行喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
該方法主要用於將已獲取的鎖釋放掉,主要包含如下幾步:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 嘗試釋放鎖成功
if (tryRelease(arg)) {
// 獲取鎖等待對了head節點
Node h = head;
// head不爲空 且 waitStatus不爲0 表示有等待鎖的線程
if (h != null && h.waitStatus != 0)
// 喚醒需鎖的線程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 當前狀態-1,第一次獲取鎖後state爲1,後面每重入鎖一次就累加一次
int c = getState() - releases;
// 檢查當前線程是否是該鎖的獨佔線程,若不是拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 計算後的狀態值爲0,表示可用釋放鎖了
if (c == 0) {
// 釋放成功標識置爲true
free = true;
// 設置該鎖的獨佔線程爲null
setExclusiveOwnerThread(null);
}
// 更新鎖狀態
setState(c);
return free;
}
複製代碼
該方法主要用於嘗試獲取鎖,根據參數不一樣,有兩種類型:
不阻塞的方法比較簡單,內部只是調用了非公平鎖中的nonfairTryAcquire方法。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
複製代碼
超時阻塞實際上是在非阻塞的基礎上加了相似acquireQueued()的自旋獲取鎖的過程,可是另外增長了超時時間的校驗。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 超時時間設置爲0則與非阻塞tryLock同樣了
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 建立節點並添加到鎖等待隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 前驅節點爲head 且 嘗試獲取鎖成功
if (p == head && tryAcquire(arg)) {
// 設置當前節點爲head節點
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 計算阻塞時間是否超時 超時則直接返回
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 與acquireQueued()相似檢查是否符合阻塞休眠條件
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 檢查是否有線程中斷請求
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
上面介紹的都是基於非公平鎖的方式,而公平鎖其實大致上與非公平鎖的實現相同,不一樣的是:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 若是鎖空閒時,則先檢查是否有等待線程隊列,沒有才繼續參與競爭鎖,有的話則直接返回false,後續則acquireQueued方法進入等待隊列
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
……
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 表示等待隊列爲空,則直接返回false 在tryAcquire中會繼續競爭鎖
// (s = h.next) == null || s.thread != Thread.currentThread() 表示本身就是隊列的尾節點 或者 head的下一個節點是本身
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
複製代碼
理論上,在高併發場景中,公平鎖是保證獲取鎖順序按照他們請求鎖的順序來,等待的線程要阻塞休眠。每次鎖釋放後都要去喚醒阻塞的線程,由於阻塞和喚醒線程有個切換的過程,頻繁的操做有必定的性能消耗。
而非公平鎖是隻有剛請求鎖時,鎖被佔用的狀況下會加入等待隊列,後續操做也相似公平鎖。可是若是一個線程釋放鎖後,剛進來一個線程當即獲取到了鎖則不須要去喚醒阻塞的線程,也就避免了這部分的性能開銷。
因此非公平鎖的性能比公平鎖好,ReentrantLock也是默認非公平鎖的。
一句話說明ReentrantLock的原理:基於CAS原子方式競爭鎖,並在高併發獲取鎖時,爲避免一直死循環CAS而將等待線程放入到AQS同步隊列中,並經過LockSupport實現線程的阻塞和喚醒。