JDK 1.5 的 java.util.concurrent.locks 包中都是鎖,其中有一個抽象類 AbstractQueuedSynchronizer (抽象隊列同步器),也就是 AQS, 咱們今天就來看看該類。java
咱們看看該類的結構,該類被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的內部類所繼承,而這些內部類都是這些鎖的真正實現,不管是公平鎖仍是非公平鎖。node
也就是說,這些鎖的真正實現都是該類來實現的。那麼,咱們就從這些鎖開始看看是如何實現從鎖到解鎖的。less
咱們先看看重入鎖 ReentranLock 的 lock 方法。工具
public void lock() {
sync.lock();
}
複製代碼
該方法調用了內部類的 sync 抽象類的 lock 方法,該方法的實現有公平鎖和非公平鎖。咱們看看公平鎖是如何實現的:ui
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
複製代碼
調用了 acquire 方法,該方法就是 AQS 的的方法,由於 sync 繼承了 AQS,而公平鎖繼承了 Sync,等於間接繼承了 AQS,咱們看看該方法。this
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
該方法JDK註釋 :spa
以獨佔模式獲取對象,若是被中斷則停止。經過先檢查中斷狀態,而後至少調用一次 tryAcquire(int) 來實現此方法,並在成功時返回。不然在成功以前,或者線程被中斷以前,一直調用 tryAcquire(int) 將線程加入隊列,線程可能重複被阻塞或不被阻塞。可使用此方法來實現 Lock.lockInterruptibly() 方法。操作系統
樓主來簡單說一下該方法的做用:該方法會試圖獲取鎖,若是獲取不到,就會被加入等待隊列等待被喚醒,這個其實和咱們以前分析的 synchronized 是差很少的。線程
咱們仔細看看該方法,首先是 tryAcquire 方法,也就是嘗試獲取鎖,該方法是須要被寫的,父類默認的方法是拋出異常。如何重寫呢?抽象類定義一個標準:若是返回 true,表示獲取鎖成功,反之失敗。設計
咱們回到 acquire 方法,若是獲取鎖成功,就直接返回了,若是失敗了,則繼續後面的操做,也就是將線程放入等待隊列中:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
咱們先看看 addWaiter(Node.EXCLUSIVE) 方法:
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製代碼
該方法註釋:將當前線程放入到隊列節點。參數呢?參數有2種,Node.EXCLUSIVE 是獨佔鎖,Node.SHARED 是分享鎖。
在 Node 類種定義了這兩個常量:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
複製代碼
獨佔鎖是null,共享鎖是空對象。
咱們看看該方法的步驟:
咱們看看 enq 方法的實現:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
該方法步驟以下:
該方法主要就是初始化頭節點和末端節點,並將新的節點追加到末端節點並更新末端節點。
咱們會到 addWaiter 方法中,該方法主要做用就是根據當前線程建立一個 node 對象,並追加到隊列的末端。
咱們再回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
addWaiter 方法會返回剛剛建立的node 對象,而後調用 acquireQueued 方法,咱們進入該方法查看:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
該方法步驟以下:
咱們看看 shouldParkAfterFailedAcquire 方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
該方法步驟以下:
再看 parkAndCheckInterrupt 方法(掛起並檢查是否中斷):
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
該方法很是的簡單,就是將當前線程掛起,等到有別的線程喚醒(一般是 head 節點中線程),而後返回當前線程是不是被中斷了,注意,該方法會清除中斷狀態。
回到 acquireQueued 方法,總結一下該方法,該方法就是將剛剛建立的線程節點掛起,而後等待喚醒,若是被喚醒了,則將本身設置爲 head 節點。最後,返回是否被中斷。
再回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
在該方法中,若是獲取鎖失敗並被喚醒,且被中斷了,那麼就執行 selfInterrupt 方法:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
複製代碼
將當前線程設置中斷狀態位。
好了,到這裏,整個lock 方法,咱們基本就分析完了,能夠說,整個方法就是將線程放入到等待隊列並掛起而後等待 head 節點喚醒。其中,tryAcquire 方法高頻出現,該方法具體實現由子類實現,好比 重入鎖,讀寫鎖,線程池的 worker,其中 CountDown 和 Semaphore 實現的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定義的?就是返回 true 表示拿到鎖了,返回 false 表示拿鎖失敗,具體如何實現AQS管不了。但他們都依賴一個極其重要的字段 ------- state。
樓主有必要說說這個字段,該字段定義了當前同步器的狀態,若是你們知道 pv 原語的話,應該很好理解這個字段,該字段在 AQS 中是如何定義的:
/** * The synchronization state. */
private volatile int state;
複製代碼
volatile。該字段可能會被多個線程修改,所以,須要設置爲 volatile ,保證變量的可見性。
咱們能夠看看 重入鎖中的公平鎖是如何使用該字段的。
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
複製代碼
該方法重寫了 tryAcquire 方法,步驟以下:
從這裏,咱們能夠看到, statei 字段很是的重要,判斷鎖是否被持有徹底根據這個字段來的。這點必定要注意,而這個設計和操做系統的 pv 由殊途同歸之妙。
那麼看完了拿鎖,再看看解鎖,咱們能夠先猜測一下如何設計,首先確定是要將 state 字段設置爲 0,才能讓下個線程拿鎖,而後呢?喚醒等待隊列中的下個線程。讓他嘗試拿鎖。那到底 doug lea 是否是這麼設計的呢?咱們來看看。
該方法調用了AQS 的 release 方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
首先嚐試釋放,若是成功,則喚醒下一個線程。
咱們先看看 tryRelease 方法 (須要重寫):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
該方法步驟以下:
能夠看到,若是 state 不是 0 的話,就會返回 false ,後面的步驟就沒有了,也就是說,重入鎖解鎖的時候不會喚醒下一個線程。
若是解鎖成功,執行下面的步驟,若是 head 頭節點不是 null 而且他的狀態不是0,說明有線程能夠喚醒,執行 unparkSuccessor 方法。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
該方法步驟以下:
這個時候,等待在 acquireQueued 方法中,準確的說是 parkAndCheckInterrupt 方法中的 線程被喚醒,開始繼續循環,嘗試拿鎖(須要修改 state 變量),並設置本身爲 head。
這裏還有一個漏掉的地方,就是 waitStatus 變量,何時會大於等於0? 該變量默認是 0,大於 0 的狀態是被取消的狀態。何時會被取消呢? 在acquireQueued 方法中,若是方法沒有正常結束,則會執行 finally 中的 cancelAcquire 方法,該方法會將狀態變成 1,也就是取消狀態。
此次咱們分析 AQS,也就是鎖的的真正實現,只分析了 lock 方法和 unlock 方法,這兩個方法是重入鎖的基礎。CountDown 和 Semaphore 是共享鎖,可是基本原理相同,只是將 state 的數字加大即可以實現。而和重入鎖等鎖相關聯的 Condition 則是經過 LockSupport 工具類直接掛起當前線程,並將當前線程添加到等待隊列中,當調用 Condition 的 signal 方法時,則喚醒隊列中的第一個線程。具體源碼咱們有機會再分析。
總之,java 重入鎖的實現基於 AQS,而 AQS 主要基於 state 變量和隊列來實現。實現原理和 pv原語 相似。
good luck!!!!!