本博客系列是學習併發編程過程當中的記錄總結。因爲文章比較多,寫的時間也比較散,因此我整理了個目錄貼(傳送門),方便查閱。html
併發編程系列博客傳送門java
本文參考了[Java多線程進階(六)—— J.U.C之locks框架:AQS綜述(1)]和Java技術之AQS詳解兩篇文章。node
AbstractQueuedSynchronizer
(簡稱AQS)類是整個 JUC包的核心類。JUC 中的ReentrantLock
、ReentrantReadWriteLock
、 CountDownLatch
、Semaphore
和LimitLatch
等同步工具都是基於AQS實現的。算法
AQS 分離出了構建同步器時的通用關注點,這些關注點主要包括以下:編程
這些關注點都是圍繞着資源——同步狀態(synchronization state)來展開的,AQS將這些通用的關注點封裝成了一個個模板方法,讓子類能夠直接使用。segmentfault
AQS 留給用戶的只有兩個問題:安全
這樣一來,定義同步器的難度就大大下降了。用戶只要解決好上面兩個問題,就能構建出一個性能優秀的同步器。多線程
下面是幾個常見的同步器對資源的定義:併發
同步器 | 資源的定義 |
---|---|
ReentrantLock | 資源表示獨佔鎖。State爲0表示鎖可用;爲1表示被佔用;爲N表示重入的次數 |
ReentrantReadWriteLock | 資源表示共享的讀鎖和獨佔的寫鎖。state邏輯上被分紅兩個16位的unsigned short,分別記錄讀鎖被多少線程使用和寫鎖被重入的次數。 |
CountDownLatch | 資源表示倒數計數器。State爲0表示計數器歸零,全部線程均可以訪問資源;爲N表示計數器未歸零,全部線程都須要阻塞。 |
Semaphore | 資源表示信號量或者令牌。State≤0表示沒有令牌可用,全部線程都須要阻塞;大於0表示由令牌可用,線程每獲取一個令牌,State減1,線程沒釋放一個令牌,State加1。 |
上面一節中介紹到 AQS 抽象出了三個關注點,下面就具體看下 AQS 是若是解決這三個問題的。app
同步狀態,其實就是資源。AQS使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSetState操做來讀取和更新這個狀態。
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
在JDK1.5以前,除了內置的監視器機制外,沒有其它方法能夠安全且便捷得阻塞和喚醒當前線程。
JDK1.5之後,java.util.concurrent.locks包提供了LockSupport類來做爲線程阻塞和喚醒的工具。
等待隊列,是AQS框架的核心,整個框架的關鍵其實就是如何在併發狀態下管理被阻塞的線程。
等待隊列是嚴格的FIFO隊列,是Craig,Landin和Hagersten鎖(CLH鎖)的一種變種,採用雙向循環鏈表實現,所以也叫CLH隊列。
1. 節點定義
CLH隊列中的結點是對線程的包裝,結點一共有兩種類型:獨佔(EXCLUSIVE)和共享(SHARED)。
每種類型的結點都有一些狀態,其中獨佔結點使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享結點使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
結點狀態 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 取消。表示後驅結點被中斷或超時,須要移出隊列 |
SIGNAL | -1 | 發信號。表示後驅結點被阻塞了(當前結點在入隊後、阻塞前,應確保將其prev結點類型改成SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。) |
CONDITION | -2 | Condition專用。表示當前結點在Condition隊列中,由於等待某個條件而被阻塞了 |
PROPAGATE | -3 | 傳播。適用於共享模式(好比連續的讀操做結點能夠依次進入臨界區,設爲PROPAGATE有助於實現這種迭代操做。) |
INITIAL | 0 | 默認。新結點會處於這種狀態 |
AQS使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,之因此這種作是由於在雙向鏈表的結構下,這樣更容易實現取消和超時功能。
next指針:用於維護隊列順序,當臨界區的資源被釋放時,頭結點經過next指針找到隊首結點。
prev指針:用於在結點(線程)被取消時,讓當前結點的前驅直接指向當前結點的後驅完成出隊動做。
static final class Node { // 共享模式結點 static final Node SHARED = new Node(); // 獨佔模式結點 static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * INITAL: 0 - 默認,新結點會處於這種狀態。 * CANCELLED: 1 - 取消,表示後續結點被中斷或超時,須要移出隊列; * SIGNAL: -1- 發信號,表示後續結點被阻塞了;(當前結點在入隊後、阻塞前,應確保將其prev結點類型改成SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。) * CONDITION: -2- Condition專用,表示當前結點在Condition隊列中,由於等待某個條件而被阻塞了; * PROPAGATE: -3- 傳播,適用於共享模式。(好比連續的讀操做結點能夠依次進入臨界區,設爲PROPAGATE有助於實現這種迭代操做。) * * waitStatus表示的是後續結點狀態,這是由於AQS中使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,這樣更容易實現取消和超時功能。 */ volatile int waitStatus; // 前驅指針 volatile Node prev; // 後驅指針 volatile Node next; // 結點所包裝的線程 volatile Thread thread; // Condition隊列使用,存儲condition隊列中的後繼節點 Node nextWaiter; Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } }
2. 隊列定義
對於CLH隊列,當線程請求資源時,若是請求不到,會將線程包裝成結點,將其掛載在隊列尾部。
下面結合代碼一塊兒看下節點進入隊列的過程。
private Node enq(final Node node) { for (;;) { Node t = tail; // 1 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 2 tail = head; } else { node.prev = t; // 3 if (compareAndSetTail(t, node)) { // 4 t.next = node; return t; } } } }
如上代碼在第一次循環中,當要在AQS隊列尾部插入元素時,AQS隊列狀態以下圖中(default)所示。也就是隊列頭、尾節點都指向null;當執行代碼(1)後節點t指向了尾部節點,這時候隊列狀態如圖中(I)所示。
這時候t爲null,故執行代碼(2),使用CAS算法設置一個哨兵節點爲頭節點,若是CAS設置成功,則讓尾部節點也指向哨兵節點,這時候隊列狀態如圖中(II)所示。
到如今爲止只插入了一個哨兵節點,還須要插入node節點,因此在第二次循環後執行到代碼(1),這時候隊列狀態如圖中(III)所示;而後執行代碼(3)設置node的前驅節點爲尾部節點,這時候隊列狀態如圖中(IV)所示;
而後經過CAS算法設置node節點爲尾部節點,CAS成功後隊列狀態如圖中(V)所示;
CAS成功後再設置原來的尾部節點的後驅節點爲node,這時候就完成了雙向鏈表的插入,此時隊列狀態如圖中(VI)所示。
上面介紹到 AQS 已經幫用戶解決了同步器定義過程當中的大部分問題,只將下面兩個問題丟給用戶解決:
具體的,AQS 是經過暴露如下 API 來讓用戶解決上面的問題的。
鉤子方法 | 描述 |
---|---|
tryAcquire | 獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。 |
tryRelease | 獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。 |
tryAcquireShared | 共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。 |
tryReleaseShared | 共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。 |
isHeldExclusively | 該線程是否正在獨佔資源。只有用到condition才須要去實現它。 |
若是你須要實現一個本身的同步器,通常狀況下只要繼承 AQS ,並重寫 AQS 中的這個幾個方法就好了。至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。要不怎麼說Doug Lea貼心呢。
須要注意的是:若是你沒在子類中重寫這幾個方法就直接調用了,會直接拋出異常。因此,在你調用這些方法以前必須重寫他們。不使用的話能夠不重寫。
查看 AQS 的源碼咱們就能夠發現這個類提供了不少方法,看起來讓人「眼花繚亂」的。可是最主要的兩類方法就是獲取資源的方法和釋放資源的方法。所以咱們抓住主要矛盾就好了:
該方法以獨佔方式獲取資源,若是獲取到資源,線程繼續往下執行,不然進入等待隊列,直到獲取到資源爲止,且整個過程忽略中斷的影響。該方法是獨佔模式下線程獲取共享資源的頂層入口。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
下面分析下這個acquire
方法的具體執行流程:
step1:首先這個方法調用了用戶本身實現的方法
tryAcquire
方法嘗試獲取資源,若是這個方法返回true,也就是表示獲取資源成功,那麼整個acquire
方法就執行結束了,線程繼續往下執行;step2:若是
tryAcquir
方法返回false,也就表示嘗試獲取資源失敗。這時acquire
方法會先調用addWaiter
方法將當前線程封裝成Node類並加入一個FIFO的雙向隊列的尾部。step3:再看
acquireQueued
這個關鍵方法。首先要注意的是這個方法中哪一個無條件的for循環,這個for循環說明acquireQueued
方法一直在自旋嘗試獲取資源。進入for循環後,首先判斷了當前節點的前繼節點是否是頭節點,若是是的話就再次嘗試獲取資源,獲取資源成功的話就直接返回false(表示未被中斷過)假如仍是沒有獲取資源成功,判斷是否須要讓當前節點進入
waiting
狀態,通過shouldParkAfterFailedAcquire
這個方法判斷,若是須要讓線程進入waiting
狀態的話,就調用LockSupport的park方法讓線程進入waiting
狀態。進入waiting
狀態後,這線程等待被interupt
或者unpark
(在release操做中會進行這樣的操做,能夠參見後面的代碼)。這個線程被喚醒後繼續執行for循環來嘗試獲取資源。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //首先判斷了當前節點的前繼節點是否是頭節點,若是是的話就再次嘗試獲取資源, //獲取資源成功的話就直接返回false(表示未被中斷過) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //判斷是否須要讓當前節點進入waiting狀態 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 若是在整個等待過程當中被中斷過,則返回true,不然返回false。 // 若是線程在等待過程當中被中斷過,它是不響應的。只是獲取資源後纔再進行自我中斷selfInterrupt(),將中斷補上。 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
以上就是acquire
方法的簡單分析。
單獨看這個方法的話可能會不太清晰,結合ReentrantLock
、ReentrantReadWriteLock
、 CountDownLatch
、Semaphore
和LimitLatch
等同步工具看這個代碼的話就會好理解不少。
release(int)
方法是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //上面已經講過了,須要用戶自定義實現 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
與acquire()方法中的tryAcquire()相似,tryRelease()方法也是須要獨佔模式的自定義同步器去實現的。正常來講,tryRelease()都會成功的,由於這是獨佔模式,該線程來釋放資源,那麼它確定已經拿到獨佔資源了,直接減掉相應量的資源便可(state-=arg),也不須要考慮線程安全的問題。
但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!因此自義定同步器在實現時,若是已經完全釋放資源(state=0),要返回true,不然返回false。
unparkSuccessor(Node)
方法用於喚醒等待隊列中下一個線程。這裏要注意的是,下一個線程並不必定是當前節點的next節點,而是下一個能夠用來喚醒的線程,若是這個節點存在,調用unpark()
方法喚醒。
總之,release()是獨佔模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。(須要注意的是隊列中被喚醒的線程不必定能立馬獲取資源,由於資源在釋放後可能立馬被其餘線程(不是在隊列中等待的線程)搶掉了)
acquireShared(int)
方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源爲止,整個過程忽略中斷。
public final void acquireShared(int arg) { //tryAcquireShared須要用戶自定義實現 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
能夠發現,這個方法的關鍵實現實際上是獲取資源失敗後,怎麼管理線程。也就是doAcquireShared
的邏輯。
//不響應中斷 private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
能夠看出,doAcquireShared
的邏輯和acquireQueued
的邏輯差很少。將當前線程加入等待隊列尾部休息,直到其餘線程釋放資源喚醒本身,本身成功拿到相應量的資源後才返回。
簡單總結下acquireShared
的流程:
step1:tryAcquireShared()嘗試獲取資源,成功則直接返回;
step2:失敗則經過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()併成功獲取到資源才返回。整個等待過程也是忽略中斷的。
releaseShared(int)
方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,若是成功釋放且容許喚醒等待線程,它會喚醒等待隊列裏的其餘線程來獲取資源。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
釋放掉資源後,喚醒後繼。跟獨佔模式下的release()類似,但有一點稍微須要注意:獨佔模式下的tryRelease()在徹底釋放掉資源(state=0)後,纔會返回true去喚醒其餘線程,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制必定量的線程併發執行,那麼擁有資源的線程在釋放掉部分資源時就能夠喚醒後繼等待結點。