前面已經講解了AQS源碼的獨享模式,今天來講一下AQS的共享模式
下面以CountDownLatch去講解AQS的共享模式
首先講下什麼是CountDownLatch,CountDownLatch所描述的是」在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待「。在API中是這麼說的:
用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
先看CountDownLatch的例子
public
static
void
main(String[] args) {
final
CountDownLatch latch =
new
CountDownLatch(
2
);
new
Thread(){
public
void
run() {
try
{
System.out.println(
"線程1執行"
);
Thread.sleep(
5000
);
latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new
Thread(){
public
void
run() {
try
{
System.out.println(
"線程2執行"
);
Thread.sleep(
3000
);
latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new
Thread(){
public
void
run() {
try
{
System.out.println(
"線程3阻塞"
);
latch.await();
System.out.println(
"線程3繼續執行"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try
{
Thread.sleep(
1000
);
System.out.println(
"主線程線程阻塞"
);
latch.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"主線程繼續執行"
);
}
|
線程3 和主線程會加入到隊列中
node1會判斷前序節點是否是頭結點,如果是前序節點是頭節點 但是計數器不爲0 則阻塞自己 並將waitstatus狀態改爲-1 即SIGNAL
node2 會判斷當前節點是否爲頭結點,前序節點不是頭結點 直接阻塞自己 並將waitstatus狀態改爲-1
如果計數器爲零,就會把node1給喚醒,喚醒後 node1將自己的節點設置爲頭結點 並將節點waitstatus狀態設置爲 -3 PROPAGATE
然後繼續執行for循環 這時候node2的前序節點是頭結點,然後繼續將節點node2設置爲頭結點,並將節點waitstatus狀態設置爲-3 即PROPAGATE
接着看CountDownLatch的源碼
public
class
CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private
static
final
class
Sync
extends
AbstractQueuedSynchronizer {
private
static
final
long
serialVersionUID = 4982264981922014374L;
Sync(
int
count) {
setState(count);
}
int
getCount() {
return
getState();
}
protected
int
tryAcquireShared(
int
acquires) {
return
(getState() ==
0
) ?
1
: -
1
;
}
protected
boolean
tryReleaseShared(
int
releases) {
// Decrement count; signal when transition to zero
for
(;;) {
int
c = getState();
if
(c ==
0
)
return
false
;
int
nextc = c-
1
;
if
(compareAndSetState(c, nextc))
return
nextc ==
0
;
}
}
}
private
final
Sync sync;
//構造一個用給定計數初始化的 CountDownLatch
public
CountDownLatch(
int
count) {
if
(count <
0
)
throw
new
IllegalArgumentException(
"count < 0"
);
this
.sync =
new
Sync(count);
}
}
public
void
await()
throws
InterruptedException {
sync.acquireSharedInterruptibly(
1
);
}
public
boolean
await(
long
timeout, TimeUnit unit)
throws
InterruptedException {
return
sync.tryAcquireSharedNanos(
1
, unit.toNanos(timeout));
}
public
void
countDown() {
sync.releaseShared(
1
);
}
}
|
可以看出CountDownLatch內部依賴Sync實現,
Sync繼承AQS。CountDownLatch僅提供了一個構造方法:
CountDownLatch(int count) : 構造一個用給定計數初始化的 CountDownLatch 設置count
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } }
Sync(int count) {
setState(count);
}
設置state是count
看countDown方法
public
void
countDown() {
sync.releaseShared(
1
);
}
public
final
boolean
releaseShared(
int
arg) {
if
(tryReleaseShared(arg)) {
//如果此線程是被等待線程裏最後一個被釋放的線程 就去通知同步等待隊列裏的節點
doReleaseShared();
return
true
;
}
return
false
;
}
|
再看tryReleaseShared方法
protected
boolean
tryReleaseShared(
int
releases) {
// Decrement count; signal when transition to zero
for
(;;) {
int
c = getState();
//獲取計數器的值
if
(c ==
0
)
return
false
;
int
nextc = c-
1
;
//每個被等待的線程執行完計數器減1
if
(compareAndSetState(c, nextc))
//設置計數器的新值
return
nextc ==
0
;
//如果計數器爲0 返回true
}
}
}
|
再看doReleaseShared方法
private
void
doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for
(;;) {
Node h = head;
if
(h !=
null
&& h != tail) {
int
ws = h.waitStatus;
//如果頭結點是-1 (可以看下面wait方法有講解,已經把頭結點設置爲-1了 所以會走
//f (ws == Node.SIGNAL) 這一步
if
(ws == Node.SIGNAL) {
if
(!compareAndSetWaitStatus(h, Node.SIGNAL,
0
))
//把頭結點再設置爲0 不成功自旋操作,直到設置成功
continue
; i
// loop to recheck cases
unparkSuccessor(h);
//喚醒節點
}
else
if
(ws ==
0
&&
!compareAndSetWaitStatus(h,
0
, Node.PROPAGATE))
continue
;
// loop on failed CAS
}
if
(h == head)
// loop if head changed
break
;
}
}
|
再看unparkSuccessor方法
private
void
unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int
ws = node.waitStatus;
//因爲頭結點已經設置爲0了,所以ws<0不滿足
if
(ws <
0
)
compareAndSetWaitStatus(node, ws,
0
);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if
(s ==
null
|| s.waitStatus >
0
) {
//這一步也不滿足,可以看下面wait方法裏有講解 頭結點的後續節點的status都是-1
//所以這一步不滿足 直接走LockSupport.unpark(s.thread);喚醒頭結點的下一個節點
s =
null
;
//如果waitstatus>0說明 節點取消了 就找下一個waitstatus是-1的節點 並喚醒
for
(Node t = tail; t !=
null
&& t != node; t = t.prev)
if
(t.waitStatus <=
0
)
s = t;
}
if
(s !=
null
)
LockSupport.unpark(s.thread);
}
|
再看wait方法
public
final
void
acquireSharedInterruptibly(
int
arg)
throws
InterruptedException {
if
(Thread.interrupted())
throw
new
InterruptedException();
if
(tryAcquireShared(arg) <
0
)
//嘗試獲取鎖,獲取失敗就執行下面的方法
doAcquireSharedInterruptibly(arg);
}
|
看tryAcquireShared方法
protected
int
tryAcquireShared(
int
acquires) {
return
(getState() ==
0
) ?
1
: -
1
;
}
|
如果state是0,說明被等待的線程全都執行完了 。return -1說明沒有執行完
再看doAcquireSharedInterruptibly方法
private
void
doAcquireSharedInterruptibly(
int
arg)
throws
InterruptedException {
final
Node node = addWaiter(Node.SHARED);
//如果隊列是空的,就新建一個頭節點,頭節點指向尾節點,
//然後再新建一個節點放在頭節點後面 如果隊列不爲空,就在尾節點後面新建一個節點。節點是shared類型的
//隊列節點的waitStatus默認是0 因爲上篇AQS源碼一種有講解,就不講那麼多了
boolean
failed =
true
;
try
{
for
(;;) {
//開啓自旋
final
Node p = node.predecessor();
if
(p == head) {
//如果新建節點的前序節點是頭節點,而且state的值爲0 就走到setHeadAndPropagate方法
int
r = tryAcquireShared(arg);
if
(r >=
0
) {
//如果被等待的線程執行完了
setHeadAndPropagate(node, r);
//把當前節點設置爲頭節點,而且喚醒後續掛起的節點
p.next =
null
;
// help GC
failed =
false
;
return
p.next =
null
;
// help GC
failed =
false
;
return
;
}
}
if
(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果當前節點的前序節點不是頭節點或者計數器不等於0,就阻塞當前節點
throw
throw
new
InterruptedException();
}
}
finally
{
if
(failed)
cancelAcquire(node);
}
}
|
再看shouldParkAfterFailedAcquire方法
}
finally
|