上文咱們講解了AbstractQueuedSynchronizer獨佔模式的acquire實現流程,本文趁熱打鐵繼續看一下AbstractQueuedSynchronizer共享模式acquire的實現流程。連續兩篇文章的學習,也能夠對比獨佔模式acquire和共享模式acquire的區別,加深對於AbstractQueuedSynchronizer的理解。html
先看一下共享模式acquire的實現,方法爲acquireShared和acquireSharedInterruptibly,二者差異不大,區別就在於後者有中斷處理,以acquireShared爲例:java
1
2
3
4
|
public
final
void
acquireShared(
int
arg) {
if
(tryAcquireShared(arg) <
0
)
doAcquireShared(arg);
}
|
這裏就能看出第一個差異來了:獨佔模式acquire的時候子類重寫的方法tryAcquire返回的是boolean,便是否tryAcquire成功;共享模式acquire的時候,返回的是一個int型變量,判斷是否<0。doAcquireShared方法的實現爲:node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private
void
doAcquireShared(
int
arg) {
final
Node node = addWaiter(Node.SHARED);
boolean
failed =
true
;
try
{
boolean
interrupted =
false
;
for
(;;) {
final
Node p = node.predecessor();
if
(p == head) {
int
r = tryAcquireShared(arg);
if
(r >=
0
) {
setHeadAndPropagate(node, r);
p.next =
null
;
// help GC
if
(interrupted)
selfInterrupt();
failed =
false
;
return
;
}
}
if
(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted =
true
;
}
}
finally
{
if
(failed)
cancelAcquire(node);
}
}
|
咱們來分析一下這段代碼作了什麼:算法
確實,共享模式下的acquire和獨佔模式下的acquire大部分邏輯差很少,最大的差異在於tryAcquireShared成功以後,獨佔模式的acquire是直接將當前節點設置爲head節點便可,共享模式會執行setHeadAndPropagate方法,顧名思義,即在設置head以後多執行了一步propagate操做。setHeadAndPropagate方法源碼爲:編程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private
void
setHeadAndPropagate(Node node,
int
propagate) {
Node h = head;
// Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if
(propagate >
0
|| h ==
null
|| h.waitStatus <
0
) {
Node s = node.next;
if
(s ==
null
|| s.isShared())
doReleaseShared();
}
}
|
第3行的代碼設置重設head,第2行的代碼因爲第3行的代碼要重設head,所以先定義一個Node型變量h得到原head的地址,這兩行代碼很簡單。數據結構
第19行~第23行的代碼是獨佔鎖和共享鎖最不同的一個地方,咱們再看獨佔鎖acquireQueued的代碼:app
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
final
boolean
acquireQueued(
final
Node node,
int
arg) {
boolean
failed =
true
;
try
{
boolean
interrupted =
false
;
for
(;;) {
final
Node p = node.predecessor();
if
(p == head && tryAcquire(arg)) {
setHead(node);
p.next =
null
;
// help GC
failed =
false
;
return
interrupted;
}
if
(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted =
true
;
}
}
finally
{
if
(failed)
cancelAcquire(node);
}
}
|
這意味着獨佔鎖某個節點被喚醒以後,它只須要將這個節點設置成head就完事了,而共享鎖不同,某個節點被設置爲head以後,若是它的後繼節點是SHARED狀態的,那麼將繼續經過doReleaseShared方法嘗試日後喚醒節點,實現了共享狀態的向後傳播。less
上面講了共享模式下acquire是如何實現的,下面再看一下release的實現流程,方法爲releaseShared:ide
1
2
3
4
5
6
7
|
public
final
boolean
releaseShared(
int
arg) {
if
(tryReleaseShared(arg)) {
doReleaseShared();
return
true
;
}
return
false
;
}
|
tryReleaseShared方法是子類實現的,若是tryReleaseShared成功,那麼執行doReleaseShared()方法:oop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
private
void
doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for
(;;) {
Node h = head;
if
(h !=
null
&& h != tail) {
int
ws = h.waitStatus;
if
(ws == Node.SIGNAL) {
if
(!compareAndSetWaitStatus(h, Node.SIGNAL,
0
))
continue
;
// loop to recheck cases
unparkSuccessor(h);
}
else
if
(ws ==
0
&&
!compareAndSetWaitStatus(h,
0
, Node.PROPAGATE))
continue
;
// loop on failed CAS
}
if
(h == head)
// loop if head changed
break
;
}
}
|
主要是兩層邏輯:
咱們知道,Condition是用於實現通知/等待機制的,和Object的wait()/notify()同樣,因爲本文以前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很長,加之Condition也是AbstractQueuedSynchronizer的一部分,所以將Condition也放在這裏寫了。
Condition分爲await()和signal()兩部分,前者用於等待、後者用於喚醒,首先看一下await()是如何實現的。Condition自己是一個接口,其在AbstractQueuedSynchronizer中的實現爲ConditionObject:
1
2
3
4
5
6
7
8
9
|
public
class
ConditionObject
implements
Condition, java.io.Serializable {
private
static
final
long
serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private
transient
Node firstWaiter;
/** Last node of condition queue. */
private
transient
Node lastWaiter;
...
}
|
這裏貼了一些字段定義,後面都是方法就不貼了,會對重點方法進行分析的。從字段定義咱們能夠看到,ConditionObject全局性地記錄了第一個等待的節點與最後一個等待的節點。
像ReentrantLock每次要使用ConditionObject,直接new一個ConditionObject出來便可。咱們關注一下await()方法的實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
final
void
await()
throws
InterruptedException {
if
(Thread.interrupted())
throw
new
InterruptedException();
Node node = addConditionWaiter();
int
savedState = fullyRelease(node);
int
interruptMode =
0
;
while
(!isOnSyncQueue(node)) {
LockSupport.park(
this
);
if
((interruptMode = checkInterruptWhileWaiting(node)) !=
0
)
break
;
}
if
(acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if
(node.nextWaiter !=
null
)
// clean up if cancelled
unlinkCancelledWaiters();
if
(interruptMode !=
0
)
reportInterruptAfterWait(interruptMode);
}
|
第2行~第3行的代碼用於處理中斷,第4行代碼比較關鍵,添加Condition的等待者,看一下實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private
Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if
(t !=
null
&& t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node =
new
Node(Thread.currentThread(), Node.CONDITION);
if
(t ==
null
)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return
node;
}
|
首先拿到隊列(注意數據結構,Condition構建出來的也是一個隊列)中最後一個等待者,緊接着第4行的的判斷,判斷最後一個等待者的waitStatus不是CONDITION的話,執行第5行的代碼,解綁取消的等待者,由於經過第8行的代碼,咱們看到,new出來的Node的狀態都是CONDITION的。
那麼unlinkCancelledWaiters作了什麼?裏面的流程就不看了,就是一些指針遍歷並判斷狀態的操做,總結一下就是:從頭至尾遍歷每個Node,遇到Node的waitStatus不是CONDITION的就從隊列中踢掉,該節點的先後節點相連。
接着第8行的代碼前面說過了,new出來了一個Node,存儲了當前線程,waitStatus是CONDITION,接着第9行~第13行的操做很好理解:
用一張圖表示一下構建的數據結構就是:
對比學習,咱們總結一下Condition構建出來的隊列和AbstractQueuedSynchronizer構建出來的隊列的差異,主要體如今2點上:
整個過程當中,咱們看到沒有使用任何CAS操做,firstWaiter和lastWaiter也沒有用volatile修飾,其實緣由很簡單:要await()必然要先lock(),既然lock()了就表示沒有競爭,沒有競爭天然也不必使用volatile+CAS的機制去保證什麼。
前面咱們看了Condition構建等待隊列的過程,接下來咱們看一下等待的過程,await()方法的代碼比較短,再貼一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
final
void
await()
throws
InterruptedException {
if
(Thread.interrupted())
throw
new
InterruptedException();
Node node = addConditionWaiter();
int
savedState = fullyRelease(node);
int
interruptMode =
0
;
while
(!isOnSyncQueue(node)) {
LockSupport.park(
this
);
if
((interruptMode = checkInterruptWhileWaiting(node)) !=
0
)
break
;
}
if
(acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if
(node.nextWaiter !=
null
)
// clean up if cancelled
unlinkCancelledWaiters();
if
(interruptMode !=
0
)
reportInterruptAfterWait(interruptMode);
}
|
構建完畢隊列以後,執行第5行的fullyRelease方法,顧名思義:fullyRelease方法的做用是徹底釋放Node的狀態。方法實現爲:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
final
int
fullyRelease(Node node) {
boolean
failed =
true
;
try
{
int
savedState = getState();
if
(release(savedState)) {
failed =
false
;
return
savedState;
}
else
{
throw
new
IllegalMonitorStateException();
}
}
finally
{
if
(failed)
node.waitStatus = Node.CANCELLED;
}
}
|
這裏第4行獲取state,第5行release的時候將整個state傳過去,理由是某線程可能屢次調用了lock()方法,好比調用了10次lock,那麼此線程就將state加到了10,因此這裏要將10傳過去,將狀態所有釋放,這樣後面的線程才能從新從state=0開始競爭鎖,這也是方法被命名爲fullyRelease的緣由,由於要徹底釋放鎖,釋放鎖以後,若是有競爭鎖的線程,那麼就喚醒第一個,這都是release方法的邏輯了,前面的文章詳細講解過。
接着看await()方法的第7行判斷」while(!isOnSyncQueue(node))」:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
final
boolean
isOnSyncQueue(Node node) {
if
(node.waitStatus == Node.CONDITION || node.prev ==
null
)
return
false
;
if
(node.next !=
null
)
// If has successor, it must be on queue
return
true
;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return
findNodeFromTail(node);
}
|
注意這裏的判斷是Node是否在AbstractQueuedSynchronizer構建的隊列中而不是Node是否在Condition構建的隊列中,若是Node不在AbstractQueuedSynchronizer構建的隊列中,那麼調用LockSupport的park方法阻塞。
至此調用await()方法的線程構建Condition等待隊列–釋放鎖–等待的過程已經所有分析完畢。
上面的代碼分析了構建Condition等待隊列–釋放鎖–等待的過程,接着看一下signal()方法通知是如何實現的:
1
2
3
4
5
6
7
|
public
final
void
signal() {
if
(!isHeldExclusively())
throw
new
IllegalMonitorStateException();
Node first = firstWaiter;
if
(first !=
null
)
doSignal(first);
}
|
首先從第2行的代碼咱們看到,要能signal(),當前線程必須持有獨佔鎖,不然拋出異常IllegalMonitorStateException。
那麼真正操做的時候,獲取第一個waiter,若是有waiter,調用doSignal方法:
1
2
3
4
5
6
7
8
|
private
void
doSignal(Node first) {
do
{
if
( (firstWaiter = first.nextWaiter) ==
null
)
lastWaiter =
null
;
first.nextWaiter =
null
;
}
while
(!transferForSignal(first) &&
(first = firstWaiter) !=
null
);
}
|
第3行~第5行的代碼很好理解:
接着執行第6行和第7行的代碼,這裏重點就是第6行的transferForSignal方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
final
boolean
transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int
ws = p.waitStatus;
if
(ws >
0
|| !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return
true
;
}
|
方法本意是將一個節點從Condition隊列轉換爲AbstractQueuedSynchronizer隊列,總結一下方法的實現:
最後上面的步驟所有成功,返回true,返回true喚醒等待節點成功。從喚醒的代碼咱們能夠得出一個重要結論:某個await()的節點被喚醒以後並不意味着它後面的代碼會當即執行,它會被加入到AbstractQueuedSynchronizer隊列的尾部,只有前面等待的節點獲取鎖所有完畢才能輪到它。
代碼分析到這裏,我想相似的signalAll方法也沒有必要再分析了,顯然signalAll方法的做用就是將全部Condition隊列中等待的節點逐一隊列中從移除,由CONDITION狀態變爲SIGNAL狀態並加入AbstractQueuedSynchronizer隊列的尾部。
可能你們看了我分析半天代碼會有點迷糊,這裏最後我貼一段我用於驗證上面Condition結論的示例代碼,首先創建一個Thread,我將之命名爲ConditionThread:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
/**
* @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html
*/
public
class
ConditionThread
implements
Runnable {
private
Lock lock;
private
Condition condition;
public
ConditionThread(Lock lock, Condition condition) {
this
.lock = lock;
this
.condition = condition;
}
@Override
public
void
run() {
if
(
"線程0"
.equals(JdkUtil.getThreadName())) {
thread0Process();
}
else
if
(
"線程1"
.equals(JdkUtil.getThreadName())) {
thread1Process();
}
else
if
(
"線程2"
.equals(JdkUtil.getThreadName())) {
thread2Process();
}
}
private
void
thread0Process() {
try
{
lock.lock();
System.out.println(
"線程0休息5秒"
);
JdkUtil.sleep(
5000
);
condition.signal();
System.out.println(
"線程0喚醒等待線程"
);
}
finally
{
lock.unlock();
}
}
private
void
thread1Process() {
try
{
lock.lock();
System.out.println(
"線程1阻塞"
);
condition.await();
System.out.println(
"線程1被喚醒"
);
}
catch
(InterruptedException e) {
}
finally
{
lock.unlock();
}
}
private
void
thread2Process() {
try
{
System.out.println(
"線程2想要獲取鎖"
);
lock.lock();
System.out.println(
"線程2獲取鎖成功"
);
}
finally
{
lock.unlock();
}
}
}
|
這個類裏面的方法就不解釋了,反正就三個方法片斷,根據線程名判斷,每一個線層執行的是其中的一個代碼片斷。寫一段測試代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/**
* @author 五月的倉頡http://www.cnblogs.com/xrq730/p/7067904.html
*/
@Test
public
void
testCondition()
throws
Exception {
Lock lock =
new
ReentrantLock();
Condition condition = lock.newCondition();
// 線程0的做用是signal
Runnable runnable0 =
new
ConditionThread(lock, condition);
Thread thread0 =
new
Thread(runnable0);
thread0.setName(
"線程0"
);
// 線程1的做用是await
Runnable runnable1 =
new
ConditionThread(lock, condition);
Thread thread1 =
new
Thread(runnable1);
thread1.setName(
"線程1"
);
// 線程2的做用是lock
Runnable runnable2 =
new
ConditionThread(lock, condition);
Thread thread2 =
new
Thread(runnable2);
thread2.setName(
"線程2"
);
thread1.start();
Thread.sleep(
1000
);
thread0.start();
Thread.sleep(
1000
);
thread2.start();
thread1.join();
}
|
測試代碼的意思是:
代碼執行結果爲:
1
2
3
4
5
6
|
1
線程
1
阻塞
2
線程
0
休息
5
秒
3
線程
2
想要獲取鎖
4
線程
0
喚醒等待線程
5
線程
2
獲取鎖成功
6
線程
1
被喚醒
|
符合咱們的結論:signal()並不意味着被喚醒的線程當即執行。因爲線程2先於線程0排隊,所以看到第5行打印的內容,線程2先獲取鎖。