今天咱們來研究學習一下AbstractQueuedSynchronizer
類的相關原理,java.util.concurrent
包中不少類都依賴於這個類所提供隊列式同步器,好比說經常使用的ReentranLock
,Semaphore
和CountDownLatch
等。前端
爲了方便理解,咱們以一段使用ReentranLock
的代碼爲例,講解ReentranLock
每一個方法中有關AQS
的使用。java
咱們都知道ReentranLock
的加鎖行爲和Synchronized
相似,都是可重入的鎖,可是兩者的實現方式確實徹底不一樣的,咱們以後也會講解Synchronized
的原理。除此以外,Synchronized的阻塞沒法被中斷,而ReentrantLock則提供了可中斷的阻塞。下面的代碼是ReentranLock
的函數,咱們就以此爲順序,依次講解這些函數背後的實現原理。node
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();
複製代碼
ReentranLock
分爲公平鎖和非公平鎖,兩者的區別就在獲取鎖機會是否和排隊順序相關。咱們都知道,若是鎖被另外一個線程持有,那麼申請鎖的其餘線程會被掛起等待,加入等待隊列。理論上,先調用lock
函數被掛起等待的線程應該排在等待隊列的前端,後調用的就排在後邊。若是此時,鎖被釋放,須要通知等待線程再次嘗試獲取鎖,公平鎖會讓最早進入隊列的線程得到鎖。而非公平鎖則會喚醒全部線程,讓它們再次嘗試獲取鎖,因此可能會致使後來的線程先得到了鎖,則就是非公平。算法
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼
咱們會發現FairSync
和NonfairSync
都繼承了Sync
類,而Sync
的父類就是AbstractQueuedSynchronizer
(後續簡稱AQS
)。可是AQS
的構造函數是空的,並無任何操做。bash
以後的源碼分析,若是沒有特別說明,就是指公平鎖。多線程
ReentranLock
的lock
函數以下所示,直接調用了sync
的lock
函數。也就是調用了FairSync
的lock
函數。函數
//ReentranLock
public void lock() {
sync.lock();
}
//FairSync
final void lock() {
//調用了AQS的acquire函數,這是關鍵函數之一
acquire(1);
}
複製代碼
咱們接下來就正式開始AQS
相關的源碼分析了,acquire
函數的做用是獲取同一時間段內只能被一個線程獲取的量,這個量就是抽象化的鎖概念。咱們先分析代碼,你慢慢就會明白其中的含義。工具
public final void acquire(int arg) {
// tryAcquire先嚐試獲取"鎖",獲取了就不進入後續流程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//addWaiter是給當前線程建立一個節點,並將其加入等待隊列
//acquireQueued是當線程已經加入等待隊列以後繼續嘗試獲取鎖.
selfInterrupt();
}
複製代碼
tryAcquire
,addWaiter
和acquireQueued
都是十分重要的函數,咱們接下來依次學習一下這些函數,理解它們的做用。源碼分析
//AQS類中的變量.
private volatile int state;
//這是FairSync的實現,AQS中未實現,子類按照本身的須要實現該函數
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取AQS中的state變量,表明抽象概念的鎖.
int c = getState();
if (c == 0) { //值爲0,那麼當前獨佔性變量還未被線程佔有
//若是當前阻塞隊列上沒有先來的線程在等待,UnfairSync這裏的實現就不一致
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//成功cas,那麼表明當前線程得到該變量的全部權,也就是說成功得到鎖
setExclusiveOwnerThread(current);
// setExclusiveOwnerThread將本線程設置爲獨佔性變量全部者線程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//若是該線程已經獲取了獨佔性變量的全部權,那麼根據重入性
//原理,將state值進行加1,表示屢次lock
//因爲已經得到鎖,該段代碼只會被一個線程同時執行,因此不須要
//進行任何並行處理
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//上述狀況都不符合,說明獲取鎖失敗
return false;
}
複製代碼
由上述代碼咱們能夠發現,tryAcquire
就是嘗試獲取那個線程獨佔的變量state
。state的值表示其狀態:若是是0,那麼當前尚未線程獨佔此變量;否在就是已經有線程獨佔了這個變量,也就是表明已經有線程得到了鎖。可是這個時候要再進行一次判斷,看是不是當前線程本身得到的這個鎖,若是是,就增長state的值。學習
這裏有幾點須要說明一下,首先是compareAndSetState
函數,這是使用CAS操做來設置state
的值,並且state值設置了volatile
修飾符,經過這兩點來確保修改state的值不會出現多線程問題。而後是公平鎖和非公平鎖的區別問題,在UnfairSync
的nonfairTryAcquire
函數中不會在相同的位置上調用hasQueuedPredecessors
來判斷當前是否已經有線程在排隊等待得到鎖。
若是tryAcquire
返回true
,那麼就是獲取鎖成功;若是返回false,那麼就是未得到鎖,須要加入阻塞等待隊列。咱們下面就來看一下addWaiter
的相關操做。
將保存當前線程信息的節點加入到等待隊列的相關函數中涉及到了無鎖隊列的相關算法,因爲在AQS
中只是將節點添加到隊尾,使用到的無鎖算法也相對簡單。真正的無鎖隊列的算法咱們等到分析ConcurrentSkippedListMap
時在進行講解。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//先使用快速入列法來嘗試一下,若是失敗,則進行更加完備的入列算法.
//只有在必要的狀況下才會使用更加複雜耗時的算法,也就是樂觀的態度
Node pred = tail; //列尾指針
if (pred != null) {
node.prev = pred; //步驟1:該節點的前趨指針指向tail
if (compareAndSetTail(pred, node)){ //步驟二:cas將尾指針指向該節點
pred.next = node;//步驟三:若是成果,讓舊列尾節點的next指針指向該節點.
return node;
}
}
//cas失敗,或在pred == null時調用enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { //cas無鎖算法的標準for循環,不停的嘗試
Node t = tail;
if (t == null) { //初始化
if (compareAndSetHead(new Node()))
//須要注意的是head是一個哨兵的做用,並不表明某個要獲取鎖的線程節點
tail = head;
} else {
//和addWaiter中一致,不過有了外側的無限循環,不停的嘗試,自旋鎖
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
經過調用addWaiter
函數,AQS
將當前線程加入到了等待隊列,可是尚未阻塞當前線程的執行,接下來咱們就來分析一下acquireQueued
函數。
因爲進入阻塞狀態的操做會下降執行效率,因此,AQS
會盡力避免試圖獲取獨佔性變量的線程進入阻塞狀態。因此,當線程加入等待隊列以後,acquireQueued
會執行一個for循環,每次都判斷當前節點是否應該得到這個變量(在隊首了)。若是不該該獲取或在再次嘗試獲取失敗,那麼就調用shouldParkAfterFailedAcquire
判斷是否應該進入阻塞狀態。若是當前節點以前的節點已經進入阻塞狀態了,那麼就能夠斷定當前節點不可能獲取到鎖,爲了防止CPU不停的執行for循環,消耗CPU資源,調用parkAndCheckInterrupt
函數來進入阻塞狀態。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //一直執行,直到獲取鎖,返回.
final Node p = node.predecessor();
//node的前驅是head,就說明,node是將要獲取鎖的下一個節點.
if (p == head && tryAcquire(arg)) { //因此再次嘗試獲取獨佔性變量
setHead(node); //若是成果,那麼就將本身設置爲head
p.next = null; // help GC
failed = false;
return interrupted;
//此時,尚未進入阻塞狀態,因此直接返回false,表示不須要中斷調用selfInterrupt函數
}
//判斷是否要進入阻塞狀態.若是`shouldParkAfterFailedAcquire`
//返回true,表示須要進入阻塞
//調用parkAndCheckInterrupt;不然表示還能夠再次嘗試獲取鎖,繼續進行for循環
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//調用parkAndCheckInterrupt進行阻塞,而後返回是否爲中斷狀態
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //前一個節點在等待獨佔性變量釋放的通知,因此,當前節點能夠阻塞
return true;
if (ws > 0) { //前一個節點處於取消獲取獨佔性變量的狀態,因此,能夠跳過去
//返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//將上一個節點的狀態設置爲signal,返回false,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //將AQS對象本身傳入
return Thread.interrupted();
}
複製代碼
由上述分析,咱們知道了AQS
經過調用LockSupport
的park
方法來執行阻塞當前進程的操做。其實,這裏的阻塞就是線程再也不執行的含義,經過調用這個函數,線程進入阻塞狀態,上述的lock
操做也就阻塞了,等待中斷或在獨佔性變量被釋放。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);//設置阻塞對象,用來記錄線程被誰阻塞的,用於線程監控和分析工具來定位
UNSAFE.park(false, 0L);//讓當前線程再也不被線程調度,就是當前線程再也不執行.
setBlocker(t, null);
}
複製代碼
關於中斷的相關知識,咱們之後再說,就繼續沿着AQS
的主線,看一下釋放獨佔性變量的相關操做吧。
與lock
操做相似,unlock
操做調用了AQS
的relase
方法,參數和調用acquire
時同樣,都是1。
public final boolean release(int arg) {
if (tryRelease(arg)) {
//釋放獨佔性變量,起始就是將status的值減1,由於acquire時是加1
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//喚醒head的後繼節點
return true;
}
return false;
}
複製代碼
由上述代碼可知,release就是先調用tryRelease
來釋放獨佔性變量。若是成功,那麼就看一下是否有等待鎖的阻塞線程,若是有,就調用unparkSuccessor
來喚醒他們。
protected final boolean tryRelease(int releases) {
//因爲只有一個線程能夠得到獨佔先變量,因此,全部操做不須要考慮多線程
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //若是等於0,那麼說明鎖應該被釋放了,不然表示當前線程有屢次lock操做.
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
咱們能夠看到tryRelease
中的邏輯也體現了可重入鎖的概念,只有等到state
的值爲0時,才表明鎖真正被釋放了。因此獨佔性變量state
的值就表明鎖的有無。當state=0
時,表示鎖未被佔有,否在表示當前鎖已經被佔有。
private void unparkSuccessor(Node node) {
.....
//通常來講,須要喚醒的線程就是head的下一個節點,可是若是它獲取鎖的操做被取消,或在節點爲null時
//就直接繼續日後遍歷,找到第一個未取消的後繼節點.
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
調用了unpark
方法後,進行lock
操做被阻塞的線程就恢復到運行狀態,就會再次執行acquireQueued
中的無限for循環中的操做,再次嘗試獲取鎖。
有關AQS
和ReentrantLock
的分析就差很少結束了。不得不說,我第一次看到AQS的實現時真是震驚,之前都認爲Synchronized
和ReentrantLock
的實現原理是一致的,都是依靠java虛擬機的功能實現的。沒有想到還有AQS
這樣一個背後大Boss在提供幫助啊。學習了這個類的原理,咱們對JUC的不少類的分析就簡單了不少。此外,AQS
涉及的CAS
操做和無鎖隊列的算法也爲咱們學習其餘無鎖算法提供了基礎。知識的海洋是無限的啊!