引言java
AQS,即AbstractQueuedSynchronizer, 隊列同步器,它是Java併發用來構建鎖和其餘同步組件的基礎框架。大多數開發者可能都不會直接使用AQS,標準同步器類的集合可以知足絕大多數狀況的需求,但若是能瞭解標準同步器類的實現方式,那麼對於理解它們的工做原理是很是有幫助的。node
AQS是一個抽象類,通常是同步組件的靜態內部類,經過組合的方式使用。AQS自己沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。設計模式
AQS原理簡介安全
AQS維護一個共享資源state,經過內置的FIFO來完成獲取資源線程的排隊工做。(這個內置的同步隊列稱爲"CLH"隊列)。該隊列由一個一個的Node結點組成,每一個Node結點維護一個prev引用和next引用,分別指向本身的前驅和後繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。多線程
從圖中能夠看出它是一個雙向鏈表。當線程獲取資源失敗(好比tryAcquire時試圖設置state狀態失敗),會被構形成一個結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(經過LockSupport.park實現,實際上是等待態)。當持有同步狀態的線程釋放同步狀態時,會喚醒後繼結點,而後此結點線程繼續加入到對同步狀態的爭奪中。架構
首先來看AQS最主要的三個成員變量:併發
1 private transient volatile Node head;框架
2分佈式
3 private transient volatile Node tail;ide
4
5 private volatile int state;
int型的變量state用來表示同步狀態. head和tail分別是同步隊列的頭結點和尾結點。假設state=0表示同步狀態可用(若是用於鎖,則表示鎖可用),state=1表示同步狀態已被佔用(鎖被佔用),狀態信息經過procted類型的getState,setState,compareAndSetState進行操做.
AQS支持兩種同步方式:
獨佔式
共享式
這樣方便使用者實現不一樣類型的同步組件,獨佔式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock。總之,AQS爲使用提供了底層支撐,如何組裝實現,使用者能夠自由發揮。
同步器的設計是基於模板方法模式的,通常的使用方式是這樣:
使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。
將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法。
AQS定義的如下可重寫的方法:
protected boolean tryAcquire(int arg)
獨佔式獲取同步狀態,試着獲取,成功返回true,反之爲false
protected boolean tryRelease(int arg)
獨佔式釋放同步狀態,等待中的其餘線程此時將有機會獲取到同步狀態;
protected int tryAcquireShared(int arg)
共享式獲取同步狀態,返回值大於等於0,表明獲取成功;反之獲取失敗;
protected boolean tryReleaseShared(int arg)
共享式釋放同步狀態,成功爲true,失敗爲false
protected boolean isHeldExclusively()
是否在獨佔模式下被線程佔用。
自定義同步器
那麼咱們如何使用AQS呢,首先,咱們須要去繼承AbstractQueuedSynchronizer這個類,而後咱們根據咱們的需求去重寫相應的方法,好比要實現一個獨佔鎖,那就去重寫tryAcquire,tryRelease方法,要實現共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最後,在咱們的組件中調用AQS中的模板方法就能夠了,而這些模板方法是會調用到咱們以前重寫的那些方法的。也就是說,咱們只須要很小的工做量就能夠實現本身的同步組件,重寫的那些方法,僅僅是一些簡單的對於共享資源state的獲取和釋放操做,至於像是獲取資源失敗,線程須要阻塞之類的操做,天然是AQS幫咱們完成了。
對於使用者來說,咱們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列複雜的實現,這些都在AQS中爲咱們處理好了。咱們只須要負責好本身的那個環節就好,也就是獲取/釋放共享資源state的姿式T_T。很經典的模板方法設計模式的應用,AQS爲咱們定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操做邏輯延遲到子類中去實現便可。
下面的例子是獨佔鎖的實現方式,看代碼:
1import java.util.concurrent.locks.AbstractQueuedSynchronizer;
2
3public class MyLock {
4
5
6 private final Sync sync = new Sync();
7
8 public void lock() {
9 sync.acquire(1);
10 }
11
12 public void unlock() {
13 sync.release(1);
14 }
15
16 public boolean isLocked() {
17 return sync.isHeldExclusively();
18 }
19
20
21 private static class Sync extends AbstractQueuedSynchronizer {
22 @Override
23 protected boolean tryAcquire(int arg) {
24
25 //首先判斷狀態是否等於=0,若是狀態==0,就將status設置爲1
26 if (compareAndSetState(0,1)) {
27 //將當前線程賦值給獨佔模式的onwer
28 setExclusiveOwnerThread(Thread.currentThread());
29 return true;
30 }
31
32 return false;
33
34 }
35
36 @Override
37 protected boolean tryRelease(int arg) {
38 if (getState() == 0) {
39 throw new IllegalMonitorStateException();
40 }
41 setExclusiveOwnerThread(null);
42 setState(0);
43 return true;
44 }
45
46 @Override
47 protected boolean isHeldExclusively() {
48 return getState() == 1;
49 }
50 }
51
52
53}
咱們測試下咱們寫的自定義同步器,咱們啓用30個線程,每一個線程對i自加10000次,同步正常的話,最終結果應爲300000;測試代碼以下:
1public class Demo {
2
3 private static CyclicBarrier barrier = new CyclicBarrier(31);
4
5 private static int count;
6
7 private static final MyLock myLock = new MyLock();
8
9
10 public static void main(String[] args) throws Exception{
11 //說明:咱們啓用30個線程,每一個線程對i自加10000次,同步正常的話,最終結果應爲300000;
12 //未加鎖前
13 for(int i=0;i<30;i++){
14 Thread t = new Thread(new Runnable() {
15 @Override
16 public void run() {
17 for(int i=0;i<10000;i++){
18 increment1();//沒有同步措施的a++;
19 }
20 try {
21 barrier.await();//等30個線程累加完畢
22 } catch (Exception e) {
23 e.printStackTrace();
24 }
25 }
26 });
27 t.start();
28 }
29 barrier.await();
30 System.out.println("沒有加鎖,count="+count);
31 //加鎖後
32 barrier.reset();//重置CyclicBarrier
33 count=0;
34 for(int i=0;i<30;i++){
35 new Thread(new Runnable() {
36 @Override
37 public void run() {
38 for(int i=0;i<10000;i++){
39 increment2();//a++採用Mutex進行同步處理
40 }
41 try {
42 barrier.await();//等30個線程累加完畢
43 } catch (Exception e) {
44 e.printStackTrace();
45 }
46 }
47 }).start();
48 }
49 barrier.await();
50 System.out.println("加鎖後,count="+count);
51 }
52
53
54
55 /**
56 * 沒有同步措施的a++
57 * @return
58 */
59 public static void increment1(){
60 count++;
61 }
62 /**
63 * 使用自定義的Mutex進行同步處理的a++
64 */
65 public static void increment2(){
66 myLock.lock();
67 count++;
68 myLock.unlock();
69 }
70}
運行結果:
1沒有加鎖,count=260399
2加鎖後,count=300000
從運行結果能夠看出,咱們寫的自定義同步器的鎖生效了。
AQS源碼分析
AQS的實現依賴內部的同步隊列(FIFO雙向隊列),若是當前線程獲取同步狀態失敗,AQS會將該線程以及等待狀態等信息構形成一個Node,將其加入同步隊列的尾部,同時阻塞當前線程,當同步狀態釋放時,喚醒隊列的頭節點。
下面舉例說下獲取和釋放同步狀態的過程:
獲取同步狀態
假設線程A要獲取同步狀態(這裏想象成鎖,方便理解),初始狀態下state=0,因此線程A能夠順利獲取鎖,A獲取鎖後將state置爲1。在A沒有釋放鎖期間,線程B也來獲取鎖,此時由於state=1,表示鎖被佔用,因此將B的線程信息和等待狀態等信息構成出一個Node節點對象,放入同步隊列,head和tail分別指向隊列的頭部和尾部(此時隊列中有一個空的Node節點做爲頭點,head指向這個空節點,空Node的後繼節點是B對應的Node節點,tail指向它),同時阻塞線程B(這裏的阻塞使用的是LockSupport.park()方法)。後續若是再有線程要獲取鎖,都會加入隊列尾部並阻塞。
釋放同步狀態
當線程A釋放鎖時,即將state置爲0,此時A會喚醒頭節點的後繼節點(所謂喚醒,實際上是調用LockSupport.unpark(B)方法),即B線程從LockSupport.park()方法返回,此時B發現state已經爲0,因此B線程能夠順利獲取鎖,B獲取鎖後B的Node節點隨之出隊。
Node 節點
Node結點是AbstractQueuedSynchronizer中的一個靜態內部類.
1static final class Node {
2 /** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/
3 static final int CANCELLED = 1;
4 /** waitStatus值,表示後繼線程須要被喚醒(unpaking)*/
5 static final int SIGNAL = -1;
6 /**waitStatus值,表示結點線程等待在condition上,當被signal後,會從等待隊列轉移到同步到隊列中 */
7 /** waitStatus value to indicate thread is waiting on condition */
8 static final int CONDITION = -2;
9 /** waitStatus值,表示下一次共享式同步狀態會被無條件地傳播下去
10 static final int PROPAGATE = -3;
11 /** 等待狀態,初始爲0 */
12 volatile int waitStatus;
13 /**當前結點的前驅結點 */
14 volatile Node prev;
15 /** 當前結點的後繼結點 */
16 volatile Node next;
17 /** 與當前結點關聯的排隊中的線程 */
18 volatile Thread thread;
19 /** ...... */
20 }
獨佔式
咱們看下acquire方法,lock方法通常會直接代理到acquire上
1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5 }
這段代碼很短,看着不太容易理解,咱們稍微作下轉化:
1 public final void acquire(int arg) {
2 boolean hasAcquired = tryAcquire(arg);
3 if (! hasAcquired){
4 Node currentThreadNode=addWaiter(Node.EXCLUSIVE);
5 boolean interrupted = acquireQueued(currentThreadNode, arg);
6
7if (interrupted) {
8 selfInterrupt();
9 }
10
11 }
12 }
1.tryAcquire方法嘗試獲取鎖,若是成功就返回,若是不成功,走到2.
2.把當前線程和等待狀態信息構形成一個Node節點,並將結點放入同步隊列的尾部
3.該Node結點在隊列中嘗試獲取同步狀態,若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷.
addWaiter
addWaiter方式主要是將同步狀態失敗的線程,構形成一個Node結點,添加到同步隊列尾部
1 private Node addWaiter(Node mode) {
2 Node node = new Node(Thread.currentThread(), mode);
3 Node pred = tail;
4 if (pred != null) {
5 node.prev = pred;
6 if (compareAndSetTail(pred, node)) {
7 pred.next = node;
8 return node;
9 }
10 }
11 enq(node);
12 return node;
13 }
建立一個Node對象,Node中包含了當前線程和Node模式(這時是排他模式)。tail是AQS的中表示同步隊列隊尾的屬性,剛開始爲null,因此進行enq(node)方法,從字面能夠看出這是一個入隊操做,來看下具體入隊細節:
1private Node enq(final Node node) {
2 for (;;) {
3 Node t = tail;
4 if (t == null) { // Must initialize
5 if (compareAndSetHead(new Node()))
6 tail = head;
7 } else {
8 node.prev = t;
9 if (compareAndSetTail(t, node)) {
10 t.next = node;
11 return t;
12 }
13 }
14 }
15 }
enq方法是一個死循環,自己沒有鎖,能夠多個線程併發訪問,假如某個線程進入方法,此時head, tail都爲null, 進入if(t==null)區域,從方法名能夠看出這裏是用CAS的方式建立一個空的Node做爲頭結點,由於此時隊列中只一個頭結點,因此tail也指向它,第一次循環執行結束。注意這裏使用CAS是防止多個線程併發執行到這兒時,只有一個線程可以執行成功,防止建立多個同步隊列。
進行第二次循環時(或者是其餘線程enq時),tail不爲null,進入else區域。將當前線程的Node結點(簡稱CNode)的prev指向tail,而後使用CAS將tail指向當前節點。
經過上面分析可知,AQS的寫入是一種雙向鏈表的插入操做,至此addWaiter分析完畢。
acquireQueued
看下acquireQueued方法:
1final boolean acquireQueued(final Node node, int arg) {
2 boolean failed = true;
3 try {
4 boolean interrupted = false;
5 for (;;) {//死循環
6 final Node p = node.predecessor();//找到當前結點的前驅結點
7 if (p == head && tryAcquire(arg)) {//若是前驅結點是頭結點,才tryAcquire,其餘結點是沒有機會tryAcquire的。
8 setHead(node);//獲取同步狀態成功,將當前結點設置爲頭結點。
9 p.next = null; // 方便GC
10 failed = false;
11 return interrupted;
12 }
13 // 若是沒有獲取到同步狀態,經過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程
14 if (shouldParkAfterFailedAcquire(p, node) &&
15 parkAndCheckInterrupt())
16 interrupted = true;
17 }
18 } finally {
19 if (failed)
20 cancelAcquire(node);
21 }
22 }
能夠看到,acquireQueued方法也是一個死循環,直到進入 if (p == head && tryAcquire(arg))條件方法塊。仍是接着剛纔的操做來分析。acquireQueued接收的參數是addWaiter方法的返回值。node.predecessor()返回當前節點的前置節點,在這裏也就是head節點,因此p==head成立,進而進行tryAcquire操做,即爭用鎖, 若是獲取成功,則進入if方法體,看下接下來的操做:
將當前節點設置爲頭節點。
將當前節點的前置節點設置的next設置爲null。
上面操做即完成了FIFO的出隊操做。
從上面的分析能夠看出,只有隊列的第二個節點能夠有機會爭用鎖,若是成功獲取鎖,則此節點晉升爲頭節點。對於第三個及之後的節點,if (p == head)條件不成立,首先進行shouldParkAfterFailedAcquire(p, node)操做(爭用鎖失敗的第二個節點也如此).
shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞
1private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2 //獲取前驅結點的wait值
3 int ws = pred.waitStatus;
4 if (ws == Node.SIGNAL)//若前驅結點的狀態是SIGNAL,意味着當前結點能夠被安全地park
5 return true;
6 if (ws > 0) {
7 // ws>0,只有CANCEL狀態ws才大於0。若前驅結點處於CANCEL狀態,也就是此結點線程已經無效,從後往前遍歷,找到一個非CANCEL狀態的結點,將本身設置爲它的後繼結點
8 do {
9 node.prev = pred = pred.prev;
10 } while (pred.waitStatus > 0);
11 pred.next = node;
12 } else {
13 // 若前驅結點爲其餘狀態,將其設置爲SIGNAL狀態
14 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
15 }
16 return false;
17 }
shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞。它首先判斷一個節點的前置節點的狀態是否爲Node.SIGNAL,若是是,是說明此節點已經將狀態設置若是鎖釋放,則應當通知它,因此它能夠安全的阻塞了,返回true。
若是前節點的狀態大於0,即爲CANCELLED狀態時,則會從前節點開始逐步循環找到一個沒有被「CANCELLED」節點設置爲當前節點的前節點,返回false。在下次循環執行shouldParkAfterFailedAcquire時,返回true。這個操做實際是把隊列中CANCELLED的節點剔除掉。
前節點狀態小於0的狀況是對應ReentrantLock的Condition條件等待的,這裏不進行展開。
若是shouldParkAfterFailedAcquire返回了true,則會執行:「parkAndCheckInterrupt()」方法,它是經過LockSupport.park(this)將當前線程掛起到WATING狀態,它須要等待一箇中斷、unpark方法來喚醒它,經過這樣一種FIFO的機制的等待,來實現了Lock的操做。
release
當前線程執行完本身的邏輯以後,須要釋放同步狀態,來看看release方法的邏輯:
1public final boolean release(int arg) {
2 if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其後繼結點,失敗則返回false
3 Node h = head;
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);//喚醒後繼結點
6 return true;
7 }
8 return false;
9 }
在方法unparkSuccessor(Node)中,就意味着真正要釋放鎖了,它傳入的是head節點(head節點是佔用鎖的節點),看下源碼:
1private void unparkSuccessor(Node node) {
2 //獲取wait狀態
3 int ws = node.waitStatus;
4 if (ws < 0)
5 compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設置爲初始值0
6 Node s = node.next;//後繼結點
7 if (s == null || s.waitStatus > 0) {//若後繼結點爲空,或狀態爲CANCEL(已失效),則從後尾部往前遍歷找到一個處於正常阻塞狀態的結點 進行喚醒
8 s = null;
9 for (Node t = tail; t != null && t != node; t = t.prev)
10 if (t.waitStatus <= 0)
11 s = t;
12 }
13 if (s != null)
14 LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的線程
15 }
內部首先會發生的動做是獲取head節點的next節點,若是獲取到的節點不爲空,則直接經過:「LockSupport.unpark()」方法來釋放對應的被掛起的線程,這樣一來將會有一個節點喚醒後繼續進入循環進一步嘗試tryAcquire()方法來獲取鎖。
共享式源碼簡單分析
共享式:共享式地獲取同步狀態。對於獨佔式同步組件來說,同一時刻只有一個線程能獲取到同步狀態,其餘線程都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法tryAcquire返回值爲boolean,這很容易理解;對於共享式同步組件來說,同一時刻能夠有多個線程同時獲取到同步狀態,這也是「共享」的意義所在。其待重寫的嘗試獲取同步狀態的方法tryAcquireShared返回值爲int。
1 protected int tryAcquireShared(int arg) {
2 throw new UnsupportedOperationException();
3 }
1.當返回值大於0時,表示獲取同步狀態成功,同時還有剩餘同步狀態可供其餘線程獲取;
2.當返回值等於0時,表示獲取同步狀態成功,但沒有可用同步狀態了;
3.當返回值小於0時,表示獲取同步狀態失敗。
acquireShared
1public final void acquireShared(int arg) {
2 if (tryAcquireShared(arg) < 0)//返回值小於0,獲取同步狀態失敗,排隊去;獲取同步狀態成功,直接返回去幹本身的事兒。
3 doAcquireShared(arg);
4 }
doAcquireShared
1private void doAcquireShared(int arg) {
2 final Node node = addWaiter(Node.SHARED);//構造一個共享結點,添加到同步隊列尾部。若隊列初始爲空,先添加一個無心義的傀儡結點,再將新節點添加到隊列尾部。
3 boolean failed = true;//是否獲取成功
4 try {
5 boolean interrupted = false;//線程parking過程當中是否被中斷過
6 for (;;) {//死循環
7 final Node p = node.predecessor();//找到前驅結點
8 if (p == head) {//頭結點持有同步狀態,只有前驅是頭結點,纔有機會嘗試獲取同步狀態
9 int r = tryAcquireShared(arg);//嘗試獲取同步裝填
10 if (r >= 0) {//r>=0,獲取成功
11 setHeadAndPropagate(node, r);//獲取成功就將當前結點設置爲頭結點,若還有可用資源,傳播下去,也就是繼續喚醒後繼結點
12 p.next = null; // 方便GC
13 if (interrupted)
14 selfInterrupt();
15 failed = false;
16 return;
17 }
18 }
19 if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態
20 parkAndCheckInterrupt())//阻塞線程
21 interrupted = true;
22 }
23 } finally {
24 if (failed)
25 cancelAcquire(node);
26 }
27 }
大致邏輯與獨佔式的acquireQueued差距不大,只不過因爲是共享式,會有多個線程同時獲取到線程,也可能同時釋放線程,空出不少同步狀態,因此當排隊中的老二獲取到同步狀態,若是還有可用資源,會繼續傳播下去。
1private void setHeadAndPropagate(Node node, int propagate) {
2 Node h = head; // Record old head for check below
3 setHead(node);
4 if (propagate > 0 || h == null || h.waitStatus < 0) {
5 Node s = node.next;
6 if (s == null || s.isShared())
7 doReleaseShared();
8 }
9 }
releaseShared
releaseShared爲釋放同步狀態
1public final boolean releaseShared(int arg) {
2 if (tryReleaseShared(arg)) {
3 doReleaseShared();//釋放同步狀態
4 return true;
5 }
6 return false;
7 }
doReleaseShared
1private void doReleaseShared() {
2 for (;;) {//死循環,共享模式,持有同步狀態的線程可能有多個,採用循環CAS保證線程安全
3 Node h = head;
4 if (h != null && h != tail) {
5 int ws = h.waitStatus;
6 if (ws == Node.SIGNAL) {
7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
8 continue;
9 unparkSuccessor(h);//喚醒後繼結點
10 }
11 else if (ws == 0 &&
12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13 continue;
14 }
15 if (h == head)
16 break;
17 }
18 }
代碼邏輯比較容易理解,須要注意的是,共享模式,釋放同步狀態也是多線程的,此處採用了CAS自旋來保證。
總結
AQS是JUC中不少同步組件的構建基礎,簡單來說,它內部實現主要是狀態變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態的結點,獲取同步狀態state失敗的線程,會被構形成一個結點(或共享式或獨佔式)加入到同步隊列尾部(採用自旋CAS來保證此操做的線程安全),隨後線程會阻塞;釋放時喚醒頭結點的後繼結點,使其加入對同步狀態的爭奪中。
若是想學習Java工程化、高性能及分佈式、深刻淺出。性能調優、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級架構進階羣:180705916,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們