JDK1.5以後引入了併發包java.util.concurrent,大大提升了Java程序的併發性能。關於java.util.concurrent包我總結以下:html
能夠說AbstractQueuedSynchronizer是併發類的重中之重。其實以前在ReentrantLock實現原理深刻探究一文中已經有結合ReentrantLock詳細解讀過AbstractQueuedSynchronizer,但限於當時水平緣由,回看一年半前的此文,感受對於AbstractQueuedSynchronizer的解讀理解還不夠深,所以這裏更新一篇文章,再次解讀AbstractQueuedSynchronizer的數據結構即相關源碼實現,本文基於JDK1.7版本。 java
AbstractQueuedSynchronizer的基本數據結構爲Node,關於Node,JDK做者寫了詳細的註釋,這裏我大體總結幾點:node
下面我用一張表格總結一下Node中持有哪些變量且每一個變量的含義:算法
關於SIGNAL、CANCELLED、CONDITION、PROPAGATE四個狀態,JDK源碼的註釋中一樣有了詳細的解讀,再用一張表格總結一下:數據結構
AbstractQueuedSynchzonizer是基於模板模式的實現,不過它的模板模式寫法有點特別,整個類中沒有任何一個abstract的抽象方法,取而代之的是,須要子類去實現的那些方法經過一個方法體拋出UnsupportedOperationException異常來讓子類知道。併發
AbstractQueuedSynchronizer類中一共有五處方法供子類實現,用表格總結一下:app
這裏的acquire很差翻譯,因此就直接原詞放上來了,由於acquire是一個動詞,後面並無帶賓語,所以不知道具體acquire的是什麼。按照我我的理解,acquire的意思應當是根據狀態字段state去獲取一個執行當前動做的資格。ide
好比ReentrantLock的lock()方法最終會調用acquire方法,那麼:性能
這種理解我認爲應當是比較準確的。ui
有了上面的這些基礎,咱們看一下獨佔式acquire的實現流程,主要是在線程acquire失敗後,是如何構建數據結構的,先看理論,以後再用一個例子畫圖說明。
看一下AbstractQuueuedSynchronizer的acquire方法實現流程,acquire方法是用於獨佔模式下進行操做的:
1
2
3
4
5
|
public
final
void
acquire(
int
arg) {
if
(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
|
tryAcquire方法前面說過了,是子類實現的一個方法,若是tryAcquire返回的是true(成功),即代表當前線程得到了一個執行當前動做的資格,天然也就不須要構建數據結構進行阻塞等待。
若是tryAcquire方法返回的是false,那麼當前線程沒有得到執行當前動做的資格,接着執行」acquireQueued(addWaiter(Node.EXCLUSIVE), arg))」這句代碼,這句話很明顯,它是由兩步構成的:
分別看一下每一步作了什麼。
先看第一步,addWaiter作了什麼,從傳入的參數Node.EXCLUSIVE咱們知道這是獨佔模式的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private
Node addWaiter(Node mode) {
Node node =
new
Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node prev = tail;
if
(prev !=
null
) {
node.prev = prev;
if
(compareAndSetTail(prev, node)) {
prev.next = node;
return
node;
}
}
enq(node);
return
node;
}
|
首先看第4行~第11行的代碼,得到當前數據結構中的尾節點,若是有尾節點,那麼先獲取這個節點認爲它是前驅節點prev,而後:
所以在數據結構中有節點的狀況下,全部新增節點都是做爲尾節點插入數據結構。從註釋上來看,這段邏輯的存在的意義是以最短路徑O(1)的效果完成快速入隊,以最大化減少開銷。
假如當前節點沒有被設置爲尾節點,那麼執行enq方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private
Node enq(
final
Node node) {
for
(;;) {
Node t = tail;
if
(t ==
null
) {
// Must initialize
if
(compareAndSetHead(
new
Node()))
tail = head;
}
else
{
node.prev = t;
if
(compareAndSetTail(t, node)) {
t.next = node;
return
t;
}
}
}
}
|
這段代碼的邏輯爲:
看完了代碼,用一張圖表示一下AbstractQueuedSynchronizer的總體數據結構(比較簡單,就不本身畫了,網上隨便找了一張圖):
隊列構建好了,下一步就是在必要的時候從隊列裏面拿出一個Node了,這就是acquireQueued方法,顧名思義,從隊列裏面acquire。看下acquireQueued方法的實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
final
boolean
acquireQueued(
final
Node node,
int
arg) {
boolean
failed =
true
;
try
{
boolean
interrupted =
false
;
for
(;;) {
final
Node p = node.prevecessor();
if
(p == head && tryAcquire(arg)) {
setHead(node);
p.next =
null
;
// help GC
failed =
false
;
return
interrupted;
}
if
(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted =
true
;
}
}
finally
{
if
(failed)
cancelAcquire(node);
}
}
|
這段代碼描述了幾件事:
看一下第一步shouldParkAfterFailedAcquire代碼作了什麼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private
static
boolean
shouldParkAfterFailedAcquire(Node prev, Node node) {
int
ws = prev.waitStatus;
if
(ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* prevecessor was cancelled. Skip over prevecessors and
* indicate retry.
*/
do {
node.prev = prev = prev.prev;
} while (prev.waitStatus > 0);
prev.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(prev, ws, Node.SIGNAL);
}
return
false
;
}
|
這裏每一個節點判斷它前驅節點的狀態,若是:
若是判斷判斷應當park,那麼parkAndCheckInterrupt方法:
1
2
3
4
|
private
final
boolean
parkAndCheckInterrupt() {
LockSupport.park(
this
);
return
Thread.interrupted();
}
|
利用LockSupport的park方法讓當前線程阻塞。
上面整理了獨佔模式的acquire流程,看到了等待的Node是如何構建成一個數據結構的,下面看一下釋放的時候作了什麼,release方法的實現爲:
1
2
3
4
5
6
7
8
9
|
public
final
boolean
release(
int
arg) {
if
(tryRelease(arg)) {
Node h = head;
if
(h !=
null
&& h.waitStatus !=
0
)
unparkSuccessor(h);
return
true
;
}
return
false
;
}
|
tryRelease一樣是子類去實現的,表示當前動做我執行完了,要釋放我執行當前動做的資格,講這個資格讓給其它線程,而後tryRelease釋放成功,獲取到head節點,若是head節點的waitStatus不爲0的話,執行unparkSuccessor方法,顧名思義unparkSuccessor意爲unpark頭結點的繼承者,方法實現爲:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private
void
unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if
(s ==
null
|| s.waitStatus >
0
) {
s =
null
;
for
(Node t = tail; t !=
null
&& t != node; t = t.prev)
if
(t.waitStatus <=
0
)
s = t;
}
if
(s !=
null
)
LockSupport.unpark(s.thread);
}
|
這段代碼比較好理解,整理一下流程:
最後,若是拿到了一個不等於null的節點s,就利用LockSupport的unpark方法讓它取消阻塞。
上面的例子講解地過於理論,下面利用ReentrantLock舉個例子,可是這裏不講ReentrantLock實現原理,只是利用ReentrantLock研究AbstractQueuedSynchronizer的acquire和release。示例代碼爲:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
/**
* @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7056614.html
*/
public
class
AbstractQueuedSynchronizerTest {
@Test
public
void
testAbstractQueuedSynchronizer() {
Lock lock =
new
ReentrantLock();
Runnable runnable0 =
new
ReentrantLockThread(lock);
Thread thread0 =
new
Thread(runnable0);
thread0.setName(
"線程0"
);
Runnable runnable1 =
new
ReentrantLockThread(lock);
Thread thread1 =
new
Thread(runnable1);
thread1.setName(
"線程1"
);
Runnable runnable2 =
new
ReentrantLockThread(lock);
Thread thread2 =
new
Thread(runnable2);
thread2.setName(
"線程2"
);
thread0.start();
thread1.start();
thread2.start();
for
(;;);
}
private
class
ReentrantLockThread
implements
Runnable {
private
Lock lock;
public
ReentrantLockThread(Lock lock) {
this
.lock = lock;
}
@Override
public
void
run() {
try
{
lock.lock();
for
(;;);
}
finally
{
lock.unlock();
}
}
}
}
|
所有是死循環,至關於第一條線程(線程0)acquire成功以後,後兩條線程(線程一、線程2)阻塞,下面的代碼就不考慮後兩條線程誰先誰後的問題,就一條線程(線程1)流程執行到底、另外一條線程(線程2)流程執行到底這麼分析了。
這裏再把addWaiter和enq兩個方法源碼貼一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private
Node addWaiter(Node mode) {
Node node =
new
Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node prev = tail;
if
(prev !=
null
) {
node.prev = prev;
if
(compareAndSetTail(prev, node)) {
prev.next = node;
return
node;
}
}
enq(node);
return
node;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private
Node enq(
final
Node node) {
for
(;;) {
Node t = tail;
if
(t ==
null
) {
// Must initialize
if
(compareAndSetHead(
new
Node()))
tail = head;
}
else
{
node.prev = t;
if
(compareAndSetTail(t, node)) {
t.next = node;
return
t;
}
}
}
}
|
首先第一個acquire失敗的線程1,因爲此時整個數據結構中麼沒有任何數據,所以addWaiter方法第4行中拿到的prev=tail爲空,執行enq方法,首先第3行獲取tail,第4行判斷到tail是null,所以頭結點new一個Node出來經過CAS算法設置爲數據結構的head,tail一樣也是這個Node,此時數據結構爲:
爲了方便描述,prev和next,我給每一個Node隨便加了一個地址。接着繼續enq,由於enq內是一個死循環,因此繼續第3行獲取tail,new了一個空的Node以後tail就有了,執行else判斷,經過第8行~第10行代碼將當前線程對應的Node追加到數據結構尾部,那麼當前構建的數據結構爲:
這樣,線程1對應的Node被加入數據結構,成爲數據結構的tail,而數據結構的head是一個什麼都沒有的空Node。
接着線程2也acquire失敗了,線程2既然acquire失敗,那也要準備被加入數據結構中,繼續先執行addWaiter方法,因爲此時已經有了tail,所以不須要執行enq方法,能夠直接將當前Node添加到數據結構尾部,那麼當前構建的數據結構爲:
至此,兩個阻塞的線程構建的三個Node已經所有歸位。
上述流程只是描述了構建數據結構的過程,並無描述線程一、線程2阻塞的流程,所以接着繼續用實際例子看一下線程一、線程2如何阻塞。貼一下acquireQueued、shouldParkAfterFailedAcquire兩個方法源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
final
boolean
acquireQueued(
final
Node node,
int
arg) {
boolean
failed =
true
;
try
{
boolean
interrupted =
false
;
for
(;;) {
final
Node p = node.prevecessor();
if
(p == head && tryAcquire(arg)) {
setHead(node);
p.next =
null
;
// help GC
failed =
false
;
return
interrupted;
}
if
(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted =
true
;
}
}
finally
{
if
(failed)
cancelAcquire(node);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private
static
boolean
shouldParkAfterFailedAcquire(Node prev, Node node) {
int
ws = prev.waitStatus;
if
(ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* prevecessor was cancelled. Skip over prevecessors and
* indicate retry.
*/
do {
node.prev = prev = prev.prev;
} while (prev.waitStatus > 0);
prev.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(prev, ws, Node.SIGNAL);
}
return
false
;
}
|
首先是線程1,它的前驅節點是head節點,在它tryAcquire成功的狀況下,執行第8行~第11行的代碼。作幾件事情:
所以,若是線程1執行tryAcquire成功,那麼數據結構將變爲:
從上述流程能夠總結到:只有前驅節點爲head的節點會嘗試tryAcquire,其他都不會,結合後面的release選繼承者的方式,保證了先acquire失敗的線程會優先從阻塞狀態中解除去從新acquire。這是一種公平的acquire方式,由於它遵循」先到先得」原則,可是咱們能夠動動手腳讓這種公平變爲非公平,好比ReentrantLock默認的非公平模式,這個留在後面說。
那若是線程1執行tryAcquire失敗,那麼要執行shouldParkAfterFailedAcquire方法了,shouldParkAfterFailedAcquire拿線程1的前驅節點也就是head節點的waitStatus作了一個判斷,由於waitStatus=0,所以執行第18行~第20行的邏輯,將head的waitStatus設置爲SIGNAL即-1,而後方法返回false,數據結構變爲:
看到這裏就一個變化:head的waitStatus從0變成了-1。既然shouldParkAfterFailedAcquire返回false,acquireQueued的第13行~第14行的判斷天然不經過,繼續走for(;;)循環,若是tryAcquire失敗顯然又來到了shouldParkAfterFailedAcquire方法,此時線程1對應的Node的前驅節點head節點的waitStatus已經變爲了SIGNAL即-1,所以執行第4行~第8行的代碼,直接返回true出去。
shouldParkAfterFailedAcquire返回true,parkAndCheckInterrupt直接調用LockSupport的park方法:
1
2
3
4
|
private
final
boolean
parkAndCheckInterrupt() {
LockSupport.park(
this
);
return
Thread.interrupted();
}
|
至此線程1阻塞,線程2阻塞的流程與線程1阻塞的流程相同,能夠本身分析一下。
另外再提一個問題,不知道你們會不會想:
我認爲這是AbstractQueuedSynchronizer開發人員作了相似自旋的操做。由於不少時候獲取acquire進行操做的時間很短,阻塞會引發上下文的切換,而很短期就從阻塞狀態解除,這樣相對會比較耗費性能。
所以咱們看到線程1自構建完畢Node加入數據結構到阻塞,一共嘗試了兩次tryAcquire,若是其中有一次成功,那麼線程1就沒有必要被阻塞,提高了性能。