願我所遇之人,所歷之事,哪怕由於我有一點點變好,我就心滿意足了。html
AQS:AbstractQueuedSynchronizerjava
貫穿全文的圖(核心):node
模板方法設計模式:定義一個操做中算法的骨架,而將一些步驟的實現延遲到子類中。git
等待隊列是CLH(Craig, Landin, and Hagersten)鎖隊列。github
經過節點中的「狀態」字段來判斷一個線程是否應該阻塞。當該節點的前一個節點釋放鎖的時候,該節點會被喚醒。算法
private transient volatile Node head;
private transient volatile Node tail;
//The synchronization state.
//在互斥鎖中它表示着線程是否已經獲取了鎖,0未獲取,1已經獲取了,大於1表示重入數。
private volatile int state;
複製代碼
AQS維護了一個volatile int state(表明共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。segmentfault
state的訪問方式有三種:設計模式
AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。安全
不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。bash
自定義同步器實現時主要實現如下幾種方法:
以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。
再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後續動做。
通常來講,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現: tryAcquire-tryRelease tryAcquireShared-tryReleaseShared 中的一種便可。
固然AQS也支持自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。
如下部分來自源碼註釋:
每次進入CLH隊列時,須要對尾節點進入隊列過程,是一個原子性操做。在出隊列時,咱們只須要更新head節點便可。在節點肯定它的後繼節點時, 須要花一些功夫,用於處理那些,因爲等待超時時間結束或中斷等緣由, 而取消等待鎖的線程。
節點的前驅指針,主要用於處理,取消等待鎖的線程。若是一個節點取消等待鎖,則此節點的前驅節點的後繼指針,要指向,此節點後繼節點中,非取消等待鎖的線程(有效等待鎖的線程節點)。
咱們用next指針鏈接實現阻塞機制。每一個節點均持有本身線程,節點經過節點的後繼鏈接喚醒其後繼節點。
CLH隊列須要一個傀儡結點做爲開始節點。咱們不會再構造函數中建立它,由於若是沒有線程競爭鎖,那麼,努力就白費了。取而代之的方案是,當有第一個競爭者時,咱們才構造頭指針和尾指針。
線程經過同一節點等待條件,可是用另一個鏈接。條件只須要放在一個非併發的鏈接隊列與節點關聯,由於只有當線程獨佔持有鎖的時候,纔會去訪問條件。當一個線程等待條件的時候,節點將會插入到條件隊列中。當條件觸發時,節點將會轉移到主隊列中。用一個狀態值,描述節點在哪個隊列上。
static final class Node {
//該等待節點處於共享模式
static final Node SHARED = new Node();
//該等待節點處於獨佔模式
static final Node EXCLUSIVE = null;
//表示節點的線程是已被取消的
static final int CANCELLED = 1;
//表示當前節點的後繼節點的線程須要被喚醒
static final int SIGNAL = -1;
//表示線程正在等待某個條件
static final int CONDITION = -2;
//表示下一個共享模式的節點應該無條件的傳播下去
static final int PROPAGATE = -3;
//狀態位 ,分別可使CANCELLED、SINGNAL、CONDITION、PROPAGATE、0
volatile int waitStatus;
volatile Node prev;//前驅節點
volatile Node next;//後繼節點
volatile Thread thread;//等待鎖的線程
//ConditionObject鏈表的後繼節點或者表明共享模式的節點。
//由於Condition隊列只能在獨佔模式下被能被訪問,咱們只須要簡單的使用鏈表隊列來連接正在等待條件的節點。
//而後它們會被轉移到同步隊列(AQS隊列)再次從新獲取。
//因爲條件隊列只能在獨佔模式下使用,因此咱們要表示共享模式的節點的話只要使用特殊值SHARED來標明便可。
Node nextWaiter;
//Returns true if node is waiting in shared mode
final boolean isShared() {
return nextWaiter == SHARED;
}
.......
}
複製代碼
waitStatus不一樣值含義:
該狀態值爲了簡便使用,因此使用了數值類型。非負數值意味着該節點不須要被喚醒。因此,大多數代碼中不須要檢查該狀態值的肯定值。
一個正常的Node,它的waitStatus初始化值是0。若是想要修改這個值,可使用AQS提供CAS進行修改。
在鎖的獲取時,並不必定只有一個線程才能持有這個鎖(或者稱爲同步狀態),因此此時有了獨佔模式和共享模式的區別,也就是在Node節點中由nextWaiter來標識。好比ReentrantLock就是一個獨佔鎖,只能有一個線程得到鎖,而WriteAndReadLock的讀鎖則能由多個線程同時獲取,但它的寫鎖則只能由一個線程持有。
//忽略中斷的(即不手動拋出InterruptedException異常)獨佔模式下的獲取方法。
//該方法在成功返回前至少會調用一次tryAcquire()方法(該方法是子類重寫的方法,若是返回true則表明能成功獲取).
//不然當前線程會進入隊列排隊,重複的阻塞和喚醒等待再次成功獲取後返回,
//該方法能夠用來實現Lock.lock
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
該方法首先嚐試獲取鎖( tryAcquire(arg)的具體實現定義在了子類中),若是獲取到,則執行完畢,不然經過addWaiter(Node.EXCLUSIVE), arg)方法把當前節點添加到等待隊列末尾,並設置爲獨佔模式。
private Node addWaiter(Node mode) {
//把當前線程包裝爲node,設爲獨佔模式
Node node = new Node(Thread.currentThread(), mode);
// 嘗試快速入隊,即無競爭條件下確定成功。若是失敗,則進入enq自旋重試入隊
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS替換當前尾部。成功則返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//插入節點到隊列中,若是隊列未初始化則初始化,而後再插入。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
若是tail節點爲空,執行enq(node);從新嘗試,最終把node插入.在把node插入隊列末尾後,它並不當即掛起該節點中線程,由於在插入它的過程當中,前面的線程可能已經執行完成,因此它會先進行自旋操做acquireQueued(node, arg),嘗試讓該線程從新獲取鎖!當條件知足獲取到了鎖則能夠從自旋過程當中退出,不然繼續。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//若是它的前繼節點爲頭結點,嘗試獲取鎖,獲取成功則返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判斷當前節點的線程是否應該被掛起,若是應該被掛起則掛起。
//等待release喚醒釋放
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//在隊列中取消當前節點
cancelAcquire(node);
}
}
複製代碼
若是沒獲取到鎖,則判斷是否應該掛起,而這個判斷則得經過它的前驅節點的waitStatus來肯定:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//該節點若是狀態若是爲SIGNAL。則返回true,而後park掛起線程
if (ws == Node.SIGNAL)
return true;
//代表該節點已經被取消,向前循環從新調整鏈表節點
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//執行到這裏表明節點是0或者PROPAGATE,而後標記他們爲SIGNAL,可是
//還不能park掛起線程。須要重試是否能獲取,若是不能,則掛起。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//掛起當前線程,且返回線程的中斷狀態
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
最後,咱們對獲取獨佔式鎖過程對作個總結:
AQS的模板方法acquire經過調用子類自定義實現的tryAcquire獲取同步狀態失敗後->將線程構形成Node節點(addWaiter)->將Node節點添加到同步隊列對尾(addWaiter)->節點以自旋的方法獲取同步狀態(acquirQueued)。在節點自旋獲取同步狀態時,只有其前驅節點是頭節點的時候纔會嘗試獲取同步狀態,若是該節點的前驅不是頭節點或者該節點的前驅節點是頭節點單獲取同步狀態失敗,則判斷當前線程須要阻塞,若是須要阻塞則須要被喚醒事後才返回。
獲取鎖的過程:
既然是釋放,那確定是持有鎖的該線程執行釋放操做,即head節點中的線程釋放鎖.
AQS中的release釋放同步狀態和acquire獲取同步狀態同樣,都是模板方法,tryRelease釋放的具體操做都有子類去實現,父類AQS只提供一個算法骨架。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//若是node的後繼節點不爲空且不是做廢狀態,則喚醒這個後繼節點,
//不然從末尾開始尋找合適的節點,若是找到,則喚醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
過程:首先調用子類的tryRelease()方法釋放鎖,而後喚醒後繼節點,在喚醒的過程當中,須要判斷後繼節點是否知足狀況,若是後繼節點不爲空且不是做廢狀態,則喚醒這個後繼節點,不然從tail節點向前尋找合適的節點,若是找到,則喚醒。
釋放鎖過程:
java.util.concurrent中的不少可阻塞類(好比ReentrantLock)都是基於AQS來實現的。AQS是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒線程,以及維護被阻塞線程的隊列。
JDK中AQS被普遍使用,基於AQS實現的同步器包括:
每個基於AQS實現的同步器都會包含兩種類型的操做,以下:
基於「複合優先於繼承」的原則,基於AQS實現的同步器通常都是:聲明一個內部私有的繼承於AQS的子類Sync,對同步器全部公有方法的調用都會委託給這個內部子類。
後面會推出如下有關AQS的文章,已加深對於AQS的理解
本文不少內容整理自網絡,參考文獻: segmentfault.com/a/119000001… segmentfault.com/a/119000001… zhuanlan.zhihu.com/p/27134110 blog.csdn.net/wojiaolinaa… www.cnblogs.com/waterystone…
FIFO隊列:www.cnblogs.com/waterystone…
我的微信公衆號: