咱們前面幾張提到過,JUC 這個包裏面的工具類的底層就是使用 CAS 和 volatile 來保證線程安全的,整個 JUC 包裏面的類都是基於它們構建的。今天咱們介紹一個很是重要的同步器,這個類是 JDK 在 CAS 和 volatile 的基礎上爲咱們提供的一個同步工具類。java
AbstractQueuedSynchronizer,JDK 1.5 引入了 JUC 包,這個包提供了一些列支持併發的組件,這些組件是一些列同步器,他們主要完成如下功能:node
AQS 是一個小框架,基於這個框架咱們能夠實現不少的同步器,ReentrantLock,CountDownLatch,Semaphore 等都是基於 AQS 實現的。算法
同步器的核心方法是 acquire 和 release 操做。安全
acquire數據結構
while(當前同步器的狀態不容許獲取操做){多線程
若是當前線程再也不隊列中,將其加入隊列併發
阻塞當前線程app
}框架
線程若是位於隊列中,將其移出隊列異步
release
更新同步器的狀態
if(新的狀態容許某個被阻塞的線程獲取成功)
解除隊列中一個或多個線程的阻塞狀態。
從上面的操做思想中咱們能夠提出三大關鍵操做:同步器狀態變動,線程阻塞和釋放,插入和移出隊列。由此能夠引伸出三個基本組件:
同步狀態
AQS 類使用 int 值來保存同步狀態,而且暴露出 getState,setState 和 compareAndSet 操做來讀取和更新這個同步狀態。線程經過修改(加/減指定的數量)碼是否成功來決定當前線程是否成功獲取到同步狀態。
State 被聲明成了 volatile,保證了可見性和有序性。又經過 CAS 指令來實現 compareAndSet ,使得當且僅當同步狀態擁有一個一致的指望值的時候,纔會被原子地設置成新值,這樣就保證了同步狀態的原子性。
阻塞
直到 JSR166,阻塞線程和解除線程阻塞都是基於 Java 的內置管程。
JUC 包使用 LockSupport 類來解決這個問題。LockSupport.park 阻塞當前線程直到有 LockSupport.unpark 方法被調用。
隊列
整個框架的核心就是如何管理線程阻塞隊列,該隊列是嚴格的 FIFO 隊列,所以不支持線程優先級的同步。同步隊列的最佳選擇是自身沒有使用底層鎖來構造的非阻塞數據結構。這裏採用了 CLH 鎖。
CLH隊列實際並不那麼像隊列,它的入隊和出隊與實際的業務密切相關。它是一個鏈表隊列。用過 AQS 的兩個字段 head(頭節點) 和 tail(尾節點)來存取,這兩個字段初始化的時候都指向了一個空節點。
入隊操做:
CLH 隊列是 FIFO 隊列,因此新的節點來到的時候,是要插入到當前隊列的尾節點以後。當一個線程獲取到同步狀態以後,其餘線程沒法獲取,轉而被構形成節點加入到同步隊列中,並且這個加入隊列的過程必需要保證線程安全,所以使用了 CAS方法,它須要傳遞當前線程認爲的尾節點和當前節點,只有設置成功後,當前節點才正式與以前的尾節點創建關聯。
出隊操做:
由於是 FIFO 隊列,因此能成功獲取到 AQS 同步狀態的一定是首節點,首節點的線程在釋放同步狀態時,會喚醒後續節點,然後續節點會在獲取 AQS 同步狀態成功的時候將本身設置爲首屆點。設置首節點是由獲取同步成功的線程來完成的,因此不須要像入隊這樣的 CAS 操做。
條件隊列
上一節是 AQS 的同步隊列,這一節是條件隊列。AQS 只有一個同步隊列,可是能夠有多個條件隊列。AQS 框架提供了一個 ConditionObject 類,給維護獨佔同步的類以及實現 Lock 接口的類使用。
ConditionObject 類 和 AQS 共用了內部節點,有本身單獨的條件隊列。Singal 操做是經過將節點從條件隊列轉移到同步隊列來實現的。
singal:
await:
方法結構
組件 | 數據結構 |
---|---|
同步狀態 | volatile int state |
阻塞 | LockSupport類 |
隊列 | Node節點 |
條件隊列 | ConditionObject |
咱們經過獨佔式同步狀態的釋放和獲取,以及共享式同步狀態的釋放和獲取來看看 AQS 是如何實現的。
獨佔式
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代碼主要完成了同步狀態的獲取,節點構造,加入同步隊列以及在同步隊列中自旋等待等相關工做。
來看看節點構造和加入隊列的實現:
private Node addWaiter(Node mode) { // 當前線程構形成Node節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 嘗試快速在尾節點後新增節點 提高算法效率 先將尾節點指向pred Node pred = tail; if (pred != null) { //尾節點不爲空 當前線程節點的前驅節點指向尾節點 node.prev = pred; //併發處理 尾節點有可能已經不是以前的節點 因此須要CAS更新 if (compareAndSetTail(pred, node)) { //CAS更新成功 當前線程爲尾節點 原先尾節點的後續節點就是當前節點 pred.next = node; return node; } } //第一個入隊的節點或者是尾節點後續節點新增失敗時進入enq enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //尾節點爲空 第一次入隊 設置頭尾節點一致 同步隊列的初始化 if (compareAndSetHead(new Node())) tail = head; } else { //全部的線程節點在構造完成第一個節點後 依次加入到同步隊列中 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
節點進入同步隊列後,就進入了一個自旋的過程,每一個線程節點都在自旋地觀察,當條件知足,獲取到了同步狀態,就能夠從自旋過程當中退出,不然依舊自旋。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取當前線程節點的前驅節點 final Node p = node.predecessor(); //前驅節點爲頭節點且成功獲取同步狀態 if (p == head && tryAcquire(arg)) { //設置當前節點爲頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //是否阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 阻塞線程的過程。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驅節點的狀態決定後續節點的行爲 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /*前驅節點爲-1 後續節點能夠被阻塞 * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /*前驅節點是初始或者共享狀態就設置爲-1 使後續節點阻塞 * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { //阻塞線程 LockSupport.park(this); return Thread.interrupted(); }
當獲取同步狀態成功以後,對於鎖這種併發組件而言,就意味着當前線程獲取到了鎖。
再看 release 方法:
head節點表示獲取鎖成功的節點,當頭結點在釋放同步狀態時,會喚醒後繼節點,若是後繼節點得到鎖成功,會把本身設置爲頭結點,節點的變化過程以下。修改head節點指向下一個得到鎖的節點,新的得到鎖的節點,將prev的指針指向null。
public final boolean release(int arg) { if (tryRelease(arg)) {//同步狀態釋放成功 Node h = head; if (h != null && h.waitStatus != 0) //直接釋放頭節點 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /*尋找符合條件的後續節點 * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //喚醒後續節點 LockSupport.unpark(s.thread); }
總結:在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中進行自旋。移除的條件是前驅節點是頭節點而且成功獲取了同步狀態。釋放時,會喚醒頭節點的後繼節點。
ReentrantLock:ReentrantLock 類使用 AQS 同步狀態來保存鎖重複持有的次數。當鎖被一個線程獲取時,ReentrantLock 也會記錄下當前得到鎖的線程表示,以便檢查是否重複獲取。
ReentrantReadWriteLock:ReentrantReadWriteLock 使用 AQS 同步狀態中的 16 爲來保存寫鎖的持有次數,剩下的 16 爲來保存讀鎖的持有次數。WriteLock 的構建方式和 ReentrantLock 同樣。ReadLock 則經過使用 acquireShared 方法來支持同時容許多個讀線程。
Semaphore:信號量使用 AQS 同步狀態來保存信號量當前計數。它裏面定義的 acquireShared 方法會減小計數,當計數爲非正值時阻塞線程。tryRelease 會增長技術,在計數爲正值時還要解除線程的阻塞。
CountDownLatch:使用 AQS 同步狀態來表示計數。當該計數爲 0 時,全部的 acquire 方法才能經過。
FutureTask:使用 AQS 的同步狀態來表示某個異步計算任務的運行狀態(初始化,運行中,被取消和完成)。設置(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一個 FutureTask 時會調用 AQS 的 release 操做。等待計算結果的線程阻塞解除是經過 AQS 的 acquire 實現的。
SynchronousQueues:SynchronousQueues類使用了內部的等待節點,這些節點能夠用於協調生產者和消費者。同時,它使用AQS同步狀態來控制當某個消費者消費當前一項時,容許一個生產者繼續生產,反之亦然。
多線程併發修改同步狀態,修改爲功的線程標記爲擁有同步狀態。
獲取失敗的線程,加入到同步隊列的隊尾;加入到隊列中後,若是當前節點的前驅節點爲頭節點再次嘗試獲取同步狀態(下文代碼:p == head && tryAcquire(arg))。
若是頭節點的下一個節點嘗試獲取同步狀態失敗後,會進入等待狀態;其餘節點則繼續自旋。
當線程執行完相應邏輯後,須要釋放同步狀態,使後繼節點有機會同步狀態(讓出資源,讓排隊的線程使用)。這時就須要調用release(int arg)方法。調用該方法後,會喚醒後繼節點。
後繼節點獲取同步狀態成功,頭節點出隊。須要注意的事,出隊操做是間接的,有節點獲取到同步狀態時,會將當前節點設置爲head,而本來的head設置爲null。
當同步隊列中頭節點喚醒後繼節點時,此時可能有其餘線程嘗試獲取同步狀態。
假設獲取成功,將會被設置爲頭節點。
頭節點後續節點獲取同步狀態失敗。
共享模式和獨佔模式最主要的區別是在支持同一時刻有多個線程同時獲取同步狀態。爲了不帶來額外的負擔,在上文中提到的同步隊列中都是用獨佔模式進行講述,其實同步隊列中的節點應該是獨佔和共享節點並存的。
共享節點嘗試獲取同步狀態。
當一個同享節點獲取到同步狀態,並喚醒後面等待的共享狀態的結果以下圖所示:
最後,獲取到同步狀態的線程執行完畢,同步隊列中只有一個獨佔節點: