上一篇咱們解讀了LockSupport的源碼,其中有提到JUC的一塊重要基石是AbstractQueuedSynchronizer類,簡稱AQS,那麼這一篇就正式學習這個類。因爲我也是以學代練,確定有不少地方理解的不夠到位,歡迎你們留言討論哈!仍是友情提示,本文的分析的JDK版本是8。node
爲何要在開篇就介紹AQS
的工做原理呢?由於先對一些知識點有個大概瞭解,能夠幫咱們在看源碼時更容易理解一些,作到有的放矢,事半功倍。安全
這裏我總結了三個比較關鍵的點,須要咱們知道的。bash
AQS
的實現思路仍是很清晰的,使用一個state來維護競爭狀態,使用CAS來安全的更新state,獲取鎖失敗的線程進入等待隊列unpark,鎖被釋放後,從隊列中喚醒一個線程來繼續嘗試獲取鎖。併發
AQS支持獨佔和共享二種模式,獨佔模式相對容易理解一些,光說不練假把式,咱們先利用AQS實現一個獨佔鎖SmartLock來加深理解。ide
public class SmartLock {
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else {
return false;
}
}
@Override
protected boolean tryRelease(int arg) {
setState(0);
setExclusiveOwnerThread(null);
return true;
}
}
private Sync mSync;
public SmartLock() {
mSync = new Sync();
}
public void lock() {
mSync.acquire(1);
}
public void unLock() {
mSync.release(1);
}
}
複製代碼
咱們新建一個內部類Sync繼承AQS,重寫它的tryAcquire和tryRelease方法,能夠理解爲它們分別對應獨佔模式下的嘗試獲取鎖和嘗試釋放鎖,返回true表示成功,false表示失敗。oop
這裏咱們能夠停下來想一下,既然AQS內部有一個state能夠利用,那咱們能夠這樣設定遊戲規則,state=1時表示鎖被佔用,state=0表示鎖沒有被某個線程持有。學習
protected boolean tryAcquire(int arg) {
// 先判斷當前持有鎖的線程是否是本線程,若是是,直接返回true,因此咱們這個鎖是支持可衝入的
if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
// CAS的方式更新state,只有當state=0時會成功更新爲1
if (compareAndSetState(0, 1)) {
// 當前線程已經獲取了鎖,設置爲Owner thread
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else {
// 返回true,當前線程會被加入等待隊列中
return false;
}
}
protected boolean tryRelease(int arg) {
// 狀態更新爲0,
setState(0);
// Owner thread設置爲null
setExclusiveOwnerThread(null);
return true;
}
複製代碼
咱們在SmartLock類中定義二個方法lock和unLock,分別調用acquire和release便可,這裏的參數沒有用到,傳1便可。測試
public void lock() {
mSync.acquire(1);
}
public void unLock() {
mSync.release(1);
}
複製代碼
咱們用SmartLock來實現一個線程安全的累加器,邏輯很簡單就是提供一個increase方法,對counter進行++操做,咱們知道++操做不是原子的,因此咱們用SmartLock來保證原子性。ui
public class SmartAdder {
private volatile int counter;
private SmartLock lock;
public SmartAdder() {
lock = new SmartLock();
}
public void increase() {
lock.lock();
try {
counter++;
} finally {
lock.unLock();
}
}
public int getCounter() {
return this.counter;
}
}
複製代碼
咱們寫一段測試的case來驗證一下,新建了一個固定有20個核心線程的線程池,而後提交了40個累加任務,每一個任務循環100000次,這樣獲得的正確結果應該是4000000。this
public static void main(String[] args) {
int threadCount = 20;
int addCount = 100000;
SmartAdder smartAdder = new SmartAdder();
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount * 2; i++) {
executorService.submit(() -> {
for (int j = 0; j < addCount; j++) {
smartAdder.increase();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("count:" + smartAdder.getCounter());
executorService.shutdown();
}
// 打印結果
count:4000000
複製代碼
打印就結果驗證了咱們的SmartLock確實可以正常工做,這樣一個簡單的互斥鎖就完成了,其實也並不複雜嘛!其中CountDonwLatch也是JUC提供的一個併發同步類,關於它的用法後面會詳解,這裏你們只須要知道await可讓當前線程等待線程池中的任務執行完成便可。
有了前面的鋪墊,咱們如今先看下AQS中獨佔模式的acquire和release二個方法的具體實現。
先看acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
能夠看到acquire是一個final方法,咱們無法重寫它,可是有預留一個tryAcquire方法讓咱們重寫,咱們在上面的SmartLock類中也是重寫了tryAcquire該方法,若是tryAcquire返回false,會調用acquireQueued方法,它的參數是addWaiter(Node.EXCLUSIVE)的結果,咱們先來具體跟進看一下addWaiter的實現。
private Node addWaiter(Node mode) {
// 新建一個Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 若是隊列的尾標兵tail不爲空,將新加入的node插入到隊尾,並更新tail
if (pred != null) {
node.prev = pred;
// 若是CAS設置tail成功,直接返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若是tail爲空,或者CAS設置tail失敗
enq(node);
return node;
}
複製代碼
這裏的思路就是將新建的node插入到隊尾,可是因爲到考慮到線程安全的問題,採用了CAS更新,若是更新失敗,調用enq方法,繼續跟進看一下實現。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 若是檢查到隊列沒有初始化,先執行初始化,注意head對頭是標兵
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 在循環中執行CAS插入尾部操做
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
因此看下來,addWaiter邏輯也很清晰,就是要將當前線程,封裝爲node插入到隊列尾部。再看下acquireQueued的實現
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回node前驅
final Node p = node.predecessor();
// 若是前驅是head,說明當前隊列中該線程排在第一順位,再次調用tryAcquire
// 由於後面調用的parkAndCheckInterrupt會讓線程等待,當鎖被release時,線程會被unpark
// 因此從新tryAcquire來獲取鎖,若是獲取成功,會將當前node設爲頭結點,至關於將當前
// node從隊列中刪除了,由於頭結點只是一個標兵,
if (p == head && tryAcquire(arg)) {
setHead(node);
// 這裏之因此能夠直接將.next置爲null,而沒有考慮node的next,由於是剛加入的node
// 它在隊尾,而又是head的next,說明隊列中就它一個,直接將head.next = null就能夠了
p.next = null; // help GC
failed = false;
return interrupted;
}
// 先對head設置waitStatus標示位,而後park線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
複製代碼
shouldParkAfterFailedAcquire這個方法頗有意思,它是將head的waitStatus設爲SINGLE,用來標識有任務須要被喚醒,在後面unpark的時候會檢查該標識位。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取node的pre的waitStatus,
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 若是已是Node.SIGNAL,能夠安全的park,直接返回
return true;
if (ws > 0) {
// 說明pred被取消了,併發時會出現這種狀況,由於咱們沒有加鎖
// 繼續向前查找waitStatus <= 0的node,
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將pred的waitStatus設爲SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
再看下parkAndCheckInterrupt這個方法的實現
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
比較簡單,直接調用了LockSupport.park,因此AQS中讓線程等待的方式就是park,這也就是爲何咱們前一篇文章要分析LockSupport源碼的緣由。
那麼線程park等待了,那固然就要有喚醒,咱們看下AQS中release的實現。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
一樣的AQS中release是一個final方法,不能被重寫,咱們能夠重寫tryRelease方法。當head不爲空,切waitStatus不爲0時,調用unparkSuccessor方法,跟進去看下實現
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 先將waitStatus設爲0
compareAndSetWaitStatus(node, ws, 0);
// 通常須要被喚醒的是node.next,可是若是next的node被取消了,或者waitStatus>0,這時候這裏的
// 策略是從尾部開始從新選擇一個node來unpark
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// unpark喚醒線程
LockSupport.unpark(s.thread);
}
複製代碼
release的實現相對簡單一些,前面介紹tryAcquire失敗後,會將當前線程插入到等待隊列中時,而後將head的waitStatus置爲SINGAL,那麼在release時,會先檢查這個標識,而後unpark,這裏有個小細節,若是head.next被取消了或者waitStatus>0,會從隊列的尾部開始往前查找到第一個符合條件的node來unpark。
這裏有個細節你們要注意,release只是將隊列中第一個知足條件等待的線程喚醒,因此接下來的邏輯仍是在acquireQueued方法中,繼續嘗試調用tryAcquire,若是成功,則會被出隊列(當前節點設爲頭結點),線程繼續執行,不然繼續等待。
介紹完了獨佔模式,再來看下共享模式。與獨佔模式相似,AQS也對共享模式提供了模板方法。分別是acquireShared和releaseShared,它們也都是final的,咱們可以重寫的方法是tryAcquireShared和tryReleaseShared。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
複製代碼
acquireShared先調用了tryAcquireShared方法,若是返回值小於0, doAcquireShared一樣構建SHARED類型的Node加入等待隊列。這裏要提一下,tryAcquireShared方法使咱們須要重寫的,注意它的返回值是int類型的,而上面咱們分析獨佔模式tryAcquire的返回值是boolean,由於在共享模式下這個返回值須要有三種狀態,因此須要是int類型。
好,咱們繼續看下doAcquireShared的實現
// 添加當前節點到隊列,與獨佔模式相似,再也不贅述
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取node的前驅
final Node p = node.predecessor();
if (p == head) {
// 再次嘗試獲取共享鎖
int r = tryAcquireShared(arg);
// 若是獲取成功
if (r >= 0) {
// 檢查隊列中後續的節點,是否須要被喚醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// park讓線程等待,與獨佔模式相似,再也不贅述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// propagate > 0, 或者 當前頭結點或者當前節點node的waitStatus < 0時,調用doReleaseShared
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
複製代碼
大部分的邏輯跟獨佔模式相似,可是多了一個檢查後續節點是否須要被喚醒的邏輯。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
能夠看到doReleaseShared是在一個循環中,若是在調用中,head發生了變化,繼續循環,不然挑出循環,而在都獨佔模式下,沒有這樣的併發問題,因此獨佔模式下不須要循環,另外幹活的就是unparkSuccessor方法,它來喚醒等待的線程,上面在分析獨佔模式時已經分析過了,這裏再也不贅述。
文章先是概述了一下AQS的基本實現原理,而後利用AQS實現了一個簡單的互斥鎖,最後詳細分析了AQS中獨佔和共享二種模式的關鍵方法的實現。以上。