一、AQS框架有同步隊列和阻塞隊列
同步隊列前文已有敘述,阻塞隊列,本次將基於CyclicBarrier梳理講解,會比較易讀java
二、CyclicBarrier有四個關鍵操做lock.lock;trip.await;trip.signalAll;lock.unlock
據此條線看一下condition阻塞隊列是如何發生做用的
2-一、lock.lock條線
2-1-一、lock是直接使用的ReentrantLock,借其源碼一讀
public void lock() {
sync.lock();
}
2-1-二、sync是ReentrantLock的靜態內部類,屬裝飾模式下進行的包裝。
具體區分NonfairSync和FairSync公平和非公平鎖。
public ReentrantLock() {//從構造器來看,默認採用的是非公平鎖
sync = new NonfairSync();
}
2-1-三、抓住lock線索,進入nonfairSync看一下其實現
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))//第一次成功,後續走acquire排隊獲取
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//會先行調用tryAcquire方法,其他邏輯同AQS自定義同步框架所述
}node
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);//實如今父類Sync中
}
}
2-1-四、Sync的nonfairTryAcquire方法,查看如何獲取資源
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();c#
/**Performs non-fair tryLock. tryAcquire isimplemented in subclasses*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//持鎖線程
int c = getState();
if (c == 0) {//有資源
if (compareAndSetState(0, acquires)) {//更新state->排它鎖
setExclusiveOwnerThread(current);//設置本身爲持鎖線程
return true;
}
} else if (current == getExclusiveOwnerThread()) {
//容許重入鎖,如CyclicBarrier的isBroken()方法
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;//非首次非重入,將排隊等待獲取資源
}框架
//lock.unlock是將會進入此節點,標記爲:ysma-unlock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//累減,兼容可重入鎖狀況
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//徹底退出,釋放資源
free = true;
setExclusiveOwnerThread(null);
}
setState(c);//可重入鎖資源釋放
return free;
}ui
final ConditionObject newCondition() {//阻塞隊列需實現此方法
return new ConditionObject();
}
}
小結:CyclicBarrier中lock.lock()的實現是AQS同步sync隊列爭搶資源
那麼將會是第一個進入的線程Thread1搶到資源並持鎖進入,其他的線程ThreadX將會阻塞等待this
2-二、trip.await條線
2-2-一、如上lock方法僅Thread1持鎖進入,執行trip.await方法後阻塞,
ThreadX等線程將會阻塞在lock處,排隊等待獲取資源
由CyclicBarrier得知Condition trip = lock.newCondition();
Condition由AbstractQueuedSynchronizer#ConditionObject實現,部分相關源碼以下
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;spa
/**Adds a new waiter to wait queue. 注意:阻塞隊列沒有前驅節點的設置*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;//初始化-簡單粗暴
else
t.nextWaiter = node;//掛到lastWaiter後面
lastWaiter = node;
return node;//waitStatus爲CONDITION狀態的節點
}線程
/**Removes and transfers all nodes*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
//ysma-transferForSignal 標記此處:將節點從阻塞隊列移往sync隊列
first = next;
} while (first != null);
}orm
/**變換condition阻塞爲sync阻塞*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}blog
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**爲本次重點-這個要看的*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//建立CONDITION塞節點
//savedState結合acquireQueued方法可解讀爲將要獲取的資源數
int savedState = fullyRelease(node);//標記ysma-fullyRelease
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//不在sync隊列中則阻塞,標記ysma-isOnSyncQueue
LockSupport.park(this);//park 阻塞到這裏了
/**標記ysma-checkInterruptWhileWaiting =>檢查等待過程當中是否發生了異常
* 發生了異常要轉移node到sync隊列
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}//退出while循環時node已在sync隊列中了,詳見checkInterruptWhileWaiting和isOnSyncQueue
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/**這裏sync隊列是忽略中斷的,因此上文被喚醒後執行checkInterruptWhileWaiting發現
*線程是已經被中斷了的,by:acquireQueued返回線程是否中斷
*可是中斷了也得給老子鎖上,
*已在sync隊列就是THROW_IE,還須要排隊等待獲取資源就是REINTERRUPT
*REINTERRUPT時是須要再次入sync隊列的,最後在加入sync隊列後
*在reportInterruptAfterWait(interruptMode)再次拋出一個異常
*來修改sync隊列中對應node的狀態
*comment1
*/
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);//標記ysma-reportInterruptAfterWait
}
}
2-2-二、await方法執行LockSupport.park(this)阻塞前先行執行了fullyRelease 和isOnSyncQueue
下面咱們進入fullyRelease一觀,看看阻塞前的操做流程
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();//state=>資源數
if (release(savedState)) {//釋放資源
failed = false;
return savedState;//返回savedState[釋放的資源數]
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)//呼應unlinkCancelledWaiters();
node.waitStatus = Node.CANCELLED;
}
}
緊接着看一下release釋放什麼狀態
public final boolean release(int arg) {
if (tryRelease(arg)) {//釋放資源,參見Sync源碼標記處:ysma-unlock
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//喚醒sync隊列頭節點的後繼節點
return true;
}
return false;
}
先不急,讓咱們在看一眼isOnSyncQueue作了什麼在作個小結
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;//CONDITION狀態的節點確定不在sync隊列中
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);//見下
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {//sync隊列中自尾到頭找詢node
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
小結:A:由上可見Thread1持鎖進入,ThreadX阻塞等待;
B:而後加入Thread1到阻塞隊列,釋放資源給ThreadX,使得ThreadX能夠得到資源,移除sync隊列;
C:Thread1直到再次從阻塞隊列進入sync同步隊列前持續阻塞。
C:ABC循環直到將全部線程從sync隊裏轉移到阻塞隊列。CyclicBarrier到達臨界點
2-三、trip.signalAll條線
當全部線程都從sync隊列轉移到condition阻塞隊列後,不能一直阻塞,這裏以signalAll爲例進行梳理
詳見2-2-一、ConditionObject#signalAll,進而由doSignalAll從firstWaiter到lastWaiter
依次把CONDITION節點從condition阻塞隊列移除並掛到sync隊列中
這裏咱們關注下標記:ysma-transferForSignal 看一下node是如何轉掛到sync隊列上的
AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
/*If cannot change waitStatus, the node has been cancelled.*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*入sync隊列,自旋到成功爲止詳見前文AQS自定義同步框架*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//入隊列後,線程取消或者更新狀態失敗,喚醒對應的線程,進行處理
LockSupport.unpark(node.thread);
return true;
}
小結:signal就是隻處理firstWaiter,signalAll就是從firstWaiter到lastWaiter挨個處理一遍
基本步驟就是從condition隊列中把節點移除,而後把節點運輸到sync隊列上
2-四、lock.unlock條線
sync同步隊列自行運轉,會依次釋放信號量喚醒線程,那麼就剩下最後的事情了,釋放鎖/資源
public void unlock() {
sync.release(1);//AbstractQueuedSynchronizer#release 以下
}
public final boolean release(int arg) {
if (tryRelease(arg)) {//詳見上文Sync#tryRelease
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
總結:從CyclicBarrier能夠看出,整個condition的運用可分爲三部分
一、利用sync隊列控制資源
二、將線程節點從sync隊列轉移到condition隊列,釋放資源可是仍阻塞持鎖人/線程
三、直到條件合適,將線程節點移回sync隊列,正常競爭資源直到退出
四、in sync=>out sync=>in condition=>out condition=>in sync=>out sync
PS:
標記備註1:ysma-checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
/**線程在等待期間中斷了,transferAfterCancelledWait
*成功就是THROW_IE不然就是REINTERRUPT 做用見:comment1*/
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
/**修改waitStatus變動condition節點爲sync節點*/
enq(node);//入sync隊列
return true;
}
/*If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just spin.
* 若是CAS更新失敗,那麼必定有別個已經在操做了,此時只須要耐心自旋,等待node掛到sync隊列上便可
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
標記備註2:ysma-reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
/**condition隊列裏面的中斷致使其提早進入sync隊列, *須要再次發送中斷信號,以觸發sync同步流程的中斷處理機制 */ }