AbstractQueuedSynchronizer抽象隊列同步器簡稱AQS,它是實現同步器的基礎組件,juc下面Lock的實現以及一些併發工具類就是經過AQS來實現的,這裏咱們經過AQS的類圖先看一下大概,下面咱們總結一下AQS的實現原理。先看看AQS的類圖。java
(1)AQS是一個經過內置的FIFO雙向隊列來完成線程的排隊工做(內部經過結點head和tail記錄隊首和隊尾元素,元素的結點類型爲Node類型,後面咱們會看到Node的具體構造)。node
/*等待隊列的隊首結點(懶加載,這裏體現爲競爭失敗的狀況下,加入同步隊列的線程執行到enq方法的時候會創 建一個Head結點)。該結點只能被setHead方法修改。而且結點的waitStatus不能爲CANCELLED*/
private transient volatile Node head;
/**等待隊列的尾節點,也是懶加載的。(enq方法)。只在加入新的阻塞結點的狀況下修改*/
private transient volatile Node tail;
複製代碼
(2)其中Node中的thread用來存放進入AQS隊列中的線程引用,Node結點內部的SHARED表示標記線程是由於獲取共享資源失敗被阻塞添加到隊列中的;Node中的EXCLUSIVE表示線程由於獲取獨佔資源失敗被阻塞添加到隊列中的。waitStatus表示當前線程的等待狀態:安全
①CANCELLED=1:表示線程由於中斷或者等待超時,須要從等待隊列中取消等待;多線程
②SIGNAL=-1:當前線程thread1佔有鎖,隊列中的head(僅僅表明頭結點,裏面沒有存放線程引用)的後繼結點node1處於等待狀態,若是已佔有鎖的線程thread1釋放鎖或被CANCEL以後就會通知這個結點node1去獲取鎖執行。併發
③CONDITION=-2:表示結點在等待隊列中(這裏指的是等待在某個lock的condition上,關於Condition的原理下面會寫到),當持有鎖的線程調用了Condition的signal()方法以後,結點會從該condition的等待隊列轉移到該lock的同步隊列上,去競爭lock。(注意:這裏的同步隊列就是咱們說的AQS維護的FIFO隊列,等待隊列則是每一個condition關聯的隊列)函數
④PROPAGTE=-3:表示下一次共享狀態獲取將會傳遞給後繼結點獲取這個共享同步狀態。工具
**(3)**AQS中維持了一個單一的volatile修飾的狀態信息state(AQS經過Unsafe的相關方法,以原子性的方式由線程去獲取這個state)。AQS提供了getState()、setState()、compareAndSetState()函數修改值(實際上調用的是unsafe的compareAndSwapInt方法)。下面是AQS中的部分紅員變量以及更新state的方法ui
//這就是咱們剛剛說到的head結點,懶加載的(只有競爭失敗須要構建同步隊列的時候,纔會建立這個head),若是頭節點存在,它的waitStatus不能爲CANCELLED
private transient volatile Node head;
//當前同步隊列尾節點的引用,也是懶加載的,只有調用enq方法的時候會添加一個新的wait node
private transient volatile Node tail;
//AQS核心:同步狀態
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
(4)AQS的設計師基於模板方法模式的。使用時候須要繼承同步器並重寫指定的方法,而且一般將子類推薦爲定義同步組件的靜態內部類,子類重寫這些方法以後,AQS工做時使用的是提供的模板方法,在這些模板方法中調用子類重寫的方法。其中子類能夠重寫的方法:this
//獨佔式的獲取同步狀態,實現該方法須要查詢當前狀態並判斷同步狀態是否符合預期,而後再進行CAS設置同步狀態
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//獨佔式的釋放同步狀態,等待獲取同步狀態的線程能夠有機會獲取同步狀態
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的獲取同步狀態
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//嘗試將狀態設置爲以共享模式釋放同步狀態。 該方法老是由執行釋放的線程調用。
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//當前同步器是否在獨佔模式下被線程佔用,通常該方法表示是否被當前線程所獨佔
protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
複製代碼
(5)AQS的內部類ConditionObject是經過結合鎖實現線程同步,ConditionObject能夠直接訪問AQS的變量(state、queue),ConditionObject是個條件變量 ,每一個ConditionObject對應一個隊列用來存放線程調用condition條件變量的await方法以後被阻塞的線程。spa
上面咱們簡單瞭解了一下AQS的基本組成,這裏經過ReentrantLock的非公平鎖實現來具體分析AQS的獨佔模式的加鎖和釋放鎖的過程。
簡單說來,AQS會把全部的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活本身的後繼節點,但正在執行的線程並不在隊列中,而那些等待執行的線程所有處於阻塞狀態(park())。以下圖所示。
**(1)**假設這個時候在初始狀況下,尚未多任務來請求競爭這個state,這時候若是第一個線程thread1調用了lock方法請求得到鎖,首先會經過CAS的方式將state更新爲1,表示本身thread1得到了鎖,並將獨佔鎖的線程持有者設置爲thread1。
final void lock() {
if (compareAndSetState(0, 1))
//setExclusiveOwnerThread是AbstractOwnableSynchronizer的方法,AQS繼承了AbstractOwnableSynchronizer
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
複製代碼
**(2)**這個時候有另外一個線程thread2來嘗試或者鎖,一樣也調用lock方法,嘗試經過CAS的方式將state更新爲1,可是因爲以前已經有線程持有了state,因此thread2這一步CAS失敗(前面的thread1已經獲取state而且沒有釋放),就會調用acquire(1)方法(該方法是AQS提供的模板方法,它會調用子類的tryAcquire方法)。非公平鎖的實現中,AQS的模板方法acquire(1)就會調用NofairSync的tryAcquire方法,而tryAcquire方法又調用的Sync的nonfairTryAcquire方法,因此咱們看看nonfairTryAcquire的流程。
//NofairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//(1)獲取當前線程
final Thread current = Thread.currentThread();
//(2)得到當前同步狀態state
int c = getState();
//(3)若是state==0,表示沒有線程獲取
if (c == 0) {
//(3-1)那麼就嘗試以CAS的方式更新state的值
if (compareAndSetState(0, acquires)) {
//(3-2)若是更新成功,就設置當前獨佔模式下同步狀態的持有者爲當前線程
setExclusiveOwnerThread(current);
//(3-3)得到成功以後,返回true
return true;
}
}
//(4)這裏是重入鎖的邏輯
else if (current == getExclusiveOwnerThread()) {
//(4-1)判斷當前佔有state的線程就是當前來再次獲取state的線程以後,就計算重入後的state
int nextc = c + acquires;
//(4-2)這裏是風險處理
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//(4-3)經過setState無條件的設置state的值,(由於這裏也只有一個線程操做state的值,即
//已經獲取到的線程,因此沒有進行CAS操做)
setState(nextc);
return true;
}
//(5)沒有得到state,也不是重入,就返回false
return false;
}
複製代碼
總結來講就是:
一、獲取當前將要去獲取鎖的線程thread2。
二、獲取當前AQS的state的值。若是此時state的值是0,那麼咱們就經過CAS操做獲取鎖,而後設置AQS的線程佔有者爲thread2。很明顯,在當前的這個執行狀況下,state的值是1不是0,由於咱們的thread1尚未釋放鎖。因此CAS失敗,後面第3步的重入邏輯也不會進行
三、若是當前將要去獲取鎖的線程等於此時AQS的exclusiveOwnerThread的線程,則此時將state的值加1,這是重入鎖的實現方式。
四、最終thread2執行到這裏會返回false。
**(3)**上面的thread2加鎖失敗,返回false。那麼根據開始咱們講到的AQS概述就應該將thread2構造爲一個Node結點加入同步隊列中。由於NofairSync的tryAcquire方法是由AQS的模板方法acquire()來調用的,那麼咱們看看該方法的源碼以及執行流程。
//(1)tryAcquire,這裏thread2執行返回了false,那麼就會執行addWaiter將當前線程構造爲一個結點加入同步隊列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
那麼咱們就看一下addWaiter方法的執行流程。
private Node addWaiter(Node mode) {
//(1)將當前線程以及阻塞緣由(是由於SHARED模式獲取state失敗仍是EXCLUSIVE獲取失敗)構造爲Node結點
Node node = new Node(Thread.currentThread(), mode);
//(2)這一步是快速將當前線程插入隊列尾部
Node pred = tail;
if (pred != null) {
//(2-1)將構造後的node結點的前驅結點設置爲tail
node.prev = pred;
//(2-2)以CAS的方式設置當前的node結點爲tail結點
if (compareAndSetTail(pred, node)) {
//(2-3)CAS設置成功,就將原來的tail的next結點設置爲當前的node結點。這樣這個雙向隊
//列就更新完成了
pred.next = node;
return node;
}
}
//(3)執行到這裏,說明要麼當前隊列爲null,要麼存在多個線程競爭失敗都去將本身設置爲tail結點,
//那麼就會有線程在上面(2-2)的CAS設置中失敗,就會到這裏調用enq方法
enq(node);
return node;
}
複製代碼
那麼總結一下add Waiter方法
一、將當前將要去獲取鎖的線程也就是thread2和獨佔模式封裝爲一個node對象。
二、嘗試快速的將當前線程構造的node結點添加做爲tail結點(這裏就是直接獲取當前tail,而後將node的前驅結點設置爲tail),而且以CAS的方式將node設置爲tail結點(CAS成功後將原tail的next設置爲node,而後這個隊列更新成功)。
三、若是2設置失敗,就進入enq方法。
在剛剛的thread1和thread2的環境下,開始時候線程阻塞隊列是空的(由於thread1獲取了鎖,thread2也是剛剛來請求鎖,因此線程阻塞隊列裏面是空的)。很明顯,這個時候隊列的尾部tail節點也是null,那麼將直接進入到enq方法。因此咱們看看enq方法的實現
private Node enq(final Node node) {
for (;;) {
//(4)仍是先獲取當前隊列的tail結點
Node t = tail;
//(5)若是tail爲null,表示當前同步隊列爲null,就必須初始化這個同步隊列的head和tail(建
//立一個哨兵結點)
if (t == null) {
//(5-1)初始狀況下,多個線程競爭失敗,在檢查的時候都發現沒有哨兵結點,因此須要CAS的
//設置哨兵結點
if (compareAndSetHead(new Node()))
tail = head;
}
//(6)tail不爲null
else {
//(6-1)直接將當前結點的前驅結點設置爲tail結點
node.prev = t;
//(6-2)前驅結點設置完畢以後,還須要以CAS的方式將本身設置爲tail結點,若是設置失敗,
//就會從新進入循環判斷一遍
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
enq方法內部是一個自旋循環,第一次循環默認狀況以下圖所示
一、首先代碼塊(4)處將t指向了tail,判斷獲得t==null,如圖(1)所示;
二、因而須要新建一個哨兵結點做爲整個同步隊列的頭節點(代碼塊5-1處執行)
三、完了以後如圖(2)所示。這樣第一次循環執行完畢。
第二次循環總體執行以下圖所示。
一、仍是先獲取當前tail結點而後將t指向tail結點。以下圖的(3)
二、而後判斷獲得當前t!=null,因此enq方法中進入代碼塊(6).
三、在(6-1)代碼塊中將node的前驅結點設置爲原來隊列的tail結點,以下圖的(4)所示。
四、設置完前驅結點以後,代碼塊(6-2)會以CAS的方式將當前的node結點設置爲tail結點,若是設置成功,就會是下圖(5)所示。更新完tail結點以後,須要保證雙向隊列的,因此將原來的指向哨兵結點的t的next結點指向node結點,以下圖(6)所示。最後返回。
總結來講,即便在多線程狀況下,enq方法仍是可以保證每一個線程結點會被安全的添加到同步隊列中,由於enq經過CAS方式將結點添加到同步隊列以後纔會返回,不然就會不斷嘗試添加(這樣實際上就是在併發狀況下,把向同步隊列添加Node變得串行化了)
**(4)**在上面AQS的模板方法中,acquire()方法還有一步acquireQueued,這個方法的主要做用就是在同步隊列中嗅探到本身的前驅結點,若是前驅結點是頭節點的話就會嘗試取獲取同步狀態,不然會先設置本身的waitStatus爲-1,而後調用LockSupport的方法park本身。具體的實現以下面代碼所示
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//在這樣一個循環中嘗試tryAcquire同步狀態
for (;;) {
//獲取前驅結點
final Node p = node.predecessor();
//(1)若是前驅結點是頭節點,就嘗試取獲取同步狀態,這裏的tryAcquire方法至關於仍是調
//用NofairSync的tryAcquire方法,在上面已經說過
if (p == head && tryAcquire(arg)) {
//若是前驅結點是頭節點而且tryAcquire返回true,那麼就從新設置頭節點爲node
setHead(node);
p.next = null; //將原來的頭節點的next設置爲null,交由GC去回收它
failed = false;
return interrupted;
}
//(2)若是不是頭節點,或者雖然前驅結點是頭節點可是嘗試獲取同步狀態失敗就會將node結點
//的waitStatus設置爲-1(SIGNAL),而且park本身,等待前驅結點的喚醒。至於喚醒的細節
//下面會說到
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
在上面的代碼中咱們能夠看出,這個方法也是一個自旋循環,繼續按照剛剛的thread1和thread2這個狀況分析。在enq方法執行完以後,同步隊列的狀況大概以下所示。
當前的node結點的前驅結點爲head,因此會調用tryAcquire()方法去得到同步狀態。可是因爲state被thread1佔有,因此tryAcquire失敗。這裏就是執行acquireQueued方法的代碼塊(2)了。代碼塊(2)中首先調用了shouldParkAfterFailedAcquire方法,該方法會將同步隊列中node結點的前驅結點的waitStatus爲CANCELLED的線程移除,並將當前調用該方法的線程所屬結點本身和他的前驅結點的waitStatus設置爲-1(SIGNAL),而後返回。具體方法實現以下所示。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//(1)獲取前驅結點的waitStatus
int ws = pred.waitStatus;
//(2)若是前驅結點的waitStatus爲SINGNAL,就直接返回true
if (ws == Node.SIGNAL)
//前驅結點的狀態爲SIGNAL,那麼該結點就可以安全的調用park方法阻塞本身了。
return true;
if (ws > 0) {
//(3)這裏就是將全部的前驅結點狀態爲CANCELLED的都移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//CAS操做將這個前驅節點設置成SIGHNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
因此shouldParkAfterFailedAcquire方法執行完畢,如今的同步隊列狀況大概就是這樣子,即哨兵結點的waitStatus值變爲-1。
上面的執行完畢返回到acquireQueued方法的時候,在acquireQueued方法中就會進行第二次循環了,可是仍是獲取state失敗,而當再次進入shouldParkAfterFailedAcquire方法的時候,當前結點node的前驅結點head的waitStatus已經爲-1(SIGNAL)了,就會返回true,而後acquireQueued方法中就會接着執行parkAndCheckInterrupt將本身park阻塞掛起。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
**(5)**咱們梳理一下整個方法調用的流程,假設如今又有一個thread3線程競爭這個state,那麼這個方法調用的流程是什麼樣的呢。
①首先確定是調用**ReentrantLock.lock()**方法去嘗試加鎖;
②由於是非公平鎖,因此就會轉到調用**NoFairSync.lock()**方法;
③在NoFairSync.lock()方法中,會首先嚐試設置state的值,由於已經被佔有那麼確定就是失敗的。這時候就會調用AQS的模板方法AQS.acquire(1)。
④在AQS的模板方法acquire(1)中,實際首先會調用的是子類的tryAcquire()方法,而在非公平鎖的實現中即**Sync.nofairTryAcquire()**方法。
⑤顯然tryAcquire()會返回false,因此acquire()繼續執行,即調用AQS.addWaiter(),就會將當前線程構造稱爲一個Node結點,初始情況下waitStatus爲0。
⑥在addWaiter方法中,會首先嚐試直接將構建的node結點以CAS的方式(存在多個線程嘗試將本身設置爲tail)設置爲tail結點,若是設置成功就直接返回,失敗的話就會進入一個自旋循環的過程。即調用**enq()**方法。最終保證本身成功被添加到同步隊列中。
⑦加入同步隊列以後,就須要將本身掛起或者嗅探本身的前驅結點是否爲頭結點以便嘗試獲取同步狀態。即調用**acquireQueued()**方法。
⑧在這裏thread3的前驅結點不是head結點,因此就直接調用**shouldParkAfterFailedAcquire()**方法,該方法首先會將剛剛的thread2線程結點中的waitStatue的值改變爲-1(初始的時候是沒有改變這個waitStatus的,每一個新節點的添加就會改變前驅結點的waitStatus值)。
⑨thread2所在結點的waitStatus改變後,shouldParkAfterFailedAcquire方法會返回false。因此以後還會在acquireQueued中進行第二次循環。並再次調用shouldParkAfterFailedAcquire方法,而後返回true。最終調用**parkAndCheckInterrupt()**將本身掛起。
每一個線程去競爭這個同步狀態失敗的話大概就會經歷上面的這些過程。假設如今thread3經歷上面這些過程以後也進入同步隊列,那麼整個同步隊列大概就是下面這樣了.
將上面的流程整理一下大概就是下面這個圖
上面說一ReentrantLock爲例到了怎樣去得到非公平鎖,那麼thread1獲取鎖,執行完釋放鎖的流程是怎樣的呢。首先確定是在finally中調用ReentrantLock.unlock()方法,因此咱們就從這個方法開始看起。
**(1)**從下面的unlock方法中咱們能夠看出,其實是調用AQS的release()方法,其中傳遞的參數爲1,表示每一次調用unlock方法都是釋放所得到的一次state。重入的狀況下會屢次調用unlock方法,也保證了lock和unlock是成對的。
public void unlock() {
sync.release(1); //這裏ReentrantLock的unlock方法調用了AQS的release方法
}
public final boolean release(int arg) {
//這裏調用了子類的tryRelease方法,即ReentrantLock的內部類Sync的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
**(2)**上面看到release方法首先會調用ReentrantLock的內部類Sync的tryRelease方法。而經過下面代碼的分析,大概知道tryRelease作了這些事情。
①獲取當前AQS的state,並減去1;
②判斷當前線程是否等於AQS的exclusiveOwnerThread,若是不是,就拋IllegalMonitorStateException異常,這就保證了加鎖和釋放鎖必須是同一個線程;
③若是(state-1)的結果不爲0,說明鎖被重入了,須要屢次unlock,這也是lock和unlock成對的緣由;
④若是(state-1)等於0,咱們就將AQS的ExclusiveOwnerThread設置爲null;
⑤若是上述操做成功了,也就是tryRelase方法返回了true;返回false表示須要屢次unlock。
protected final boolean tryRelease(int releases) {
//(1)獲取當前的state,而後減1,獲得要更新的state
int c = getState() - releases;
//(2)判斷當前調用的線程是否是持有鎖的線程,若是不是拋出IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//(3)判斷更新後的state是否是0
if (c == 0) {
free = true;
//(3-1)將當前鎖持者設爲null
setExclusiveOwnerThread(null);
}
//(4)設置當前state=c=getState()-releases
setState(c);
//(5)只有state==0,纔會返回true
return free;
}
複製代碼
**(3)**那麼當tryRelease返回true以後,就會執行release方法中if語句塊中的內容。從上面咱們看到,
if (tryRelease(arg)) {
//(1)獲取當前隊列的頭節點head
Node h = head;
//(2)判斷頭節點不爲null,而且頭結點的waitStatus不爲0(CACCELLED)
if (h != null && h.waitStatus != 0)
//(3-1)調用下面的方法喚醒同步隊列head結點的後繼結點中的線程
unparkSuccessor(h);
return true;
}
複製代碼
**(4)**在獲取鎖的流程分析中,咱們知道當前同步隊列以下所示,因此判斷獲得head!=null而且head的waitStatus=-1。因此會執行unparkSuccessor方法,傳遞的參數爲指向head的一個引用h.那下面咱們就看看unparkSuccessor方法中處理了什麼事情。
private void unparkSuccessor(Node node) {
//(1)得到node的waitStatus
int ws = node.waitStatus;
//(2)判斷waitStatus是否小於0
if (ws < 0)
//(2-1)若是waitStatus小於0須要將其以CAS的方式設置爲0
compareAndSetWaitStatus(node, ws, 0);
//(2)得到s的後繼結點,這裏即head的後繼結點
Node s = node.next;
//(3)判斷後繼結點是否已經被移除,或者其waitStatus==CANCELLED
if (s == null || s.waitStatus > 0) {
//(3-1)若是s!=null,可是其waitStatus=CANCELLED須要將其設置爲null
s = null;
//(3-2)會從尾部結點開始尋找,找到離head最近的不爲null而且node.waitStatus的結點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//(4)node.next!=null或者找到的一個離head最近的結點不爲null
if (s != null)
//(4-1)喚醒這個結點中的線程
LockSupport.unpark(s.thread);
}
複製代碼
從上面的代碼實現中能夠總結,unparkSuccessor主要作了兩件事情:
①獲取head節點的waitStatus,若是小於0,就經過CAS操做將head節點的waitStatus修改成0
②尋找head節點的下一個節點,若是這個節點的waitStatus小於0,就喚醒這個節點,不然遍歷下去,找到第一個waitStatus<=0的節點,並喚醒。
**(5)**下面咱們應該分析的是釋放掉state以後,喚醒同步隊列中的結點以後程序又是是怎樣執行的。按照上面的同步隊列示意圖,那麼下面會執行這些
①thread1(獲取到鎖的線程)調用unlock方法以後,最終執行到unparkSuccessor方法會喚醒thread2結點。因此thread2被unpark。
②再回想一下,當時thread2是在調用acquireQueued方法以後的parkAndCheckInterrupt裏面被park阻塞掛起了,因此thread2被喚醒以後繼續執行acquireQueued方法中的for循環(到這裏能夠往前回憶看一下acquireQueued方法中的for循環作了哪些事情);
③for循環中作的第一件事情就是查看本身的前驅結點是否是頭結點(按照上面的同步隊列狀況是知足的);
④前驅結點是head結點,就會調用tryAcquire方法嘗試獲取state,由於thread1已經釋放了state,即state=0,因此thread2調用tryAcquire方法時候,以CAS的方式去將state從0更新爲1是成功的,因此這個時候thread2就獲取到了鎖
⑤thread2獲取state成功,就會從acquireQueued方法中退出。注意這時候的acquireQueued返回值爲false,因此在AQS的模板方法的acquire中會直接從if條件退出,最後執行本身鎖住的代碼塊中的程序。