學習自http://www.importnew.com/21889.html
https://blog.csdn.net/yanyan19880509/article/details/52349056
https://www.cnblogs.com/waterystone/p/4920797.html
1
2
3
|
public
void
await()
throws
InterruptedException { };
//調用await()方法的線程會被掛起,它會等待直到count值爲0才繼續執行
public
boolean
await(
long
timeout, TimeUnit unit)
throws
InterruptedException { };
//和await()類似,只不過等待一定的時間後count值還沒變爲0的話就會繼續執行
public
void
countDown() { };
//將count值減1
|
下面看一個例子大家就清楚CountDownLatch的用法了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public
class
Test {
public
static
void
main(String[] args) {
final
CountDownLatch latch =
new
CountDownLatch(
2
);
new
Thread(){
public
void
run() {
try
{
System.out.println(
"子線程"
+Thread.currentThread().getName()+
"正在執行"
);
Thread.sleep(
3000
);
System.out.println(
"子線程"
+Thread.currentThread().getName()+
"執行完畢"
);
latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new
Thread(){
public
void
run() {
try
{
System.out.println(
"子線程"
+Thread.currentThread().getName()+
"正在執行"
);
Thread.sleep(
3000
);
System.out.println(
"子線程"
+Thread.currentThread().getName()+
"執行完畢"
);
latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try
{
System.out.println(
"等待2個子線程執行完畢..."
);
latch.await();
System.out.println(
"2個子線程已經執行完畢"
);
System.out.println(
"繼續執行主線程"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
執行結果:
1
2
3
4
5
6
7
|
線程Thread-
0
正在執行
線程Thread-
1
正在執行
等待
2
個子線程執行完畢...
線程Thread-
0
執行完畢
線程Thread-
1
執行完畢
2
個子線程已經執行完畢
繼續執行主線程
|
在new CountDownLatch(2)的時候
對於 CountDownLatch 來說,state=2表示所有調用await方法的線程都應該阻塞,等到同一個latch被調用兩次countDown後才能喚醒沉睡的線程
爲了充分了解AQS裏的鏈表,這裏假設上面掛起等待的線程數爲2個
當latch被成功減到0後,AQS的state就成了0。那個成功減到0的那個線程。然後節點3被喚醒了。當節點3醒來後,發現自己是通知狀態,然後刪除自己,喚醒節點4。
上面的流程,如果落實到代碼,把 state置爲0的那個線程,會判斷head指向節點的狀態,如果爲通知狀態,則喚醒後續節點,即線程3節點,然後head指向線程3節點,head指向的舊節點會被刪除掉。當線程3恢復執行後,發現自身爲通知狀態,又會把head指向線程4節點,然後刪除自身節點,並喚醒線程4。
線程節點的狀態是什麼時候設置上去的?其實,一個線程在阻塞之前,就會把它前面的節點設置爲通知狀態,這樣便可以實現鏈式喚醒機制了。
AQS是一些同步的抽象,簡單介紹一下
兩大元素
1.volatile int state->共享資源
2.FIFO線程等待隊列,多線程爭用資源被阻塞時會進入此隊列
AQS定義兩種資源佔用方式:
1.Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)
2.Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)
自定義同步器在實現時只需要實現state的增減即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在底層實現好了
自定義同步器的時候,主要實現以下幾種方法:
1.isHeldExclusively(),該線程是否正在獨佔資源,只有用到condition的時候才需要去實現這個方法
2.tryAcquire(int),獨佔。嘗試獲取資源,成功返回true,失敗返回fasle
3.tryRelease(int)
4.tryAcquireShared(int),共享。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
5.tryReleaseShared(int)
以ReentrantLock爲例
state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。
以CountDownLatch以例
任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每個子線程執行完後countDown()一次,state會CAS減1。等到所有子線程都執行完後(即state=0),會unpark()主調用線程,然後主調用線程就會從await()函數返回,繼續後餘動作。
深入看一下CountDownLatch
首先是他的內部類
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync;
tryAcquireShared不知道有啥用,理論上應該是來獲取資源的,但是對於CountDownLatch不需要了,因爲他只需要一開始設定state即可。不過CountDownLatch爲什麼是共享鎖我理解了,他允許多個線程同時使用CPU資源。
Node.waitStatus
-CANCELLED:值爲1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus爲CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
-SIGNAL:值爲-1,被標識爲該等待喚醒狀態的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該後繼結點的線程執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識爲SIGNAL狀態的後繼結點的線程執行。
-CONDITION:值爲-2,與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
-PROPAGATE:值爲-3,與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。
0狀態:值爲0,代表初始化狀態。
AQS在判斷狀態時,通過用waitStatus>0表示取消狀態,而waitStatus<0表示有效狀態。
CountDownLatch的-1方法
public void countDown() { sync.releaseShared(1); }
AQS中
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
前面講到:releaseShared就是讓state-1,當你減到0的時候,減到0的那個線程開始喚醒線程。
這裏和代碼邏輯對應一下。
tryReleaseShared就是讓state-1。如果減到0,就返回true了,開始執行doReleaseShared,喚醒線程。
private void doReleaseShared() { for (;;) { Node h = head;//頭結點 if (h != null && h != tail) {//如果不爲空,且不是尾節點 int ws = h.waitStatus; if (ws == Node.SIGNAL) {//如果是待喚醒,waitStatus改爲初始化狀態 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);//找到他後面的第一個節點,也就是允許使用cpu,同時解鎖了 } else if (ws == 0 &&//如果是初始化狀態 希望把他變成可運行狀態 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head)//如果head變了,就繼續loop // loop if head changed break; } }
所以doReleaseShared的整體意思是:喚醒下一個線程
unparkSuccessor
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0);//如果是可運行、可喚醒,那就置爲初始化狀態 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null;//如果爲空或者cancel for (Node p = tail; p != node && p != null; p = p.prev)//從後往前找 if (p.waitStatus <= 0)//目的是爲了找到一個可運行、可喚醒的 s = p; } if (s != null)//如果找到了 LockSupport.unpark(s.thread);//解鎖這個線程 }
整體的意思就是,找到他後面的第一個節點,也就是允許使用cpu,同時解鎖了
那麼被喚醒的線程是在哪裏喚醒下一個線程的呢?
需要從await入手
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//如果state!=0,之前初始化已經讓state=2了 doAcquireSharedInterruptibly(arg); }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//尾部加一個node try { for (;;) { final Node p = node.predecessor();//前一個node if (p == head) {//是head int r = tryAcquireShared(arg); if (r >= 0) {//state==0 setHeadAndPropagate(node, r);//那就開始,並喚醒下一個線程 p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//鎖住咯,並讓上一個線程置爲通知狀態 throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
CountDownLatch就分析的差不多了
爲何CountDownLatch被稱爲共享鎖?
因爲CountDownLatch latch = new CountDownLatch(2),他允許了多個線程同時使用CPU資源,而ReentrantLock是單行道。在這個實例化中,鎖可以認爲有2道(state),每個線程執行完後,通過cas方式正確地減了一道。
再講一下CyclicBarrier、 Semaphore,不過就不扯源碼了
CyclicBarrier
可以實現讓一組線程等待至某個狀態之後再全部同時執行
例子
續
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public
class
Test {
public
static
void
main(String[] args) {
int
N =
4
;
CyclicBarrier barrier =
new
CyclicBarrier(N);
for
(
int
i=
0
;i<N;i++)
new
Writer(barrier).start();
}
static
class
Writer
extends
Thread{
private
CyclicBarrier cyclicBarrier;
public
Writer(CyclicBarrier cyclicBarrier) {
this
.cyclicBarrier = cyclicBarrier;
}
@Override
public
void
run() {
System.out.println(
"線程"
+Thread.currentThread().getName()+
"正在寫入數據..."
);
try
{
Thread.sleep(
5000
);
//以睡眠來模擬寫入數據操作
System.out.println(
"線程"
+Thread.currentThread().getName()+
"寫入數據完畢,等待其他線程寫入完畢"
);
cyclicBarrier.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(
"所有線程寫入完畢,繼續處理其他任務..."
);
}
}
}
|
執行結果:
1
2
3
4
5
6
7
8
9
10
11
12
|
線程Thread-
0
正在寫入數據...
線程Thread-
3
正在寫入數據...
線程Thread-
2
正在寫入數據...
線程Thread-
1
正在寫入數據...
線程Thread-
2
寫入數據完畢,等待其他線程寫入完畢
線程Thread-
0
寫入數據完畢,等待其他線程寫入完畢
線程Thread-
3
寫入數據完畢,等待其他線程寫入完畢
線程Thread-
1
寫入數據完畢,等待其他線程寫入完畢
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
所有線程寫入完畢,繼續處理其他任務...
|
總結CountDownLatch、CyclicBarrie的區別
前者是手動-1,另外線程先阻塞着。state減到0後,另外線程可以跑了
後者是若干線程先阻塞着,等到阻塞到若干數量後,所有線程就可以跑了
Semaphore
若一個工廠有5臺機器,但是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那麼我們就可以通過Semaphore來實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public
class
Test {
public
static
void
main(String[] args) {
int
N =
8
;
//工人數
Semaphore semaphore =
new
Semaphore(
5
);
//機器數目
for
(
int
i=
0
;i<N;i++)
new
Worker(i,semaphore).start();
}
static
class
Worker
extends
Thread{
private
int
num;
private
Semaphore semaphore;
public
Worker(
int
num,Semaphore semaphore){
this
.num = num;
this
.semaphore = semaphore;
}
@Override
public
void
run() {
try
{
semaphore.acquire();
System.out.println(
"工人"
+
this
.num+
"佔用一個機器在生產..."
);
Thread.sleep(
2000
);
System.out.println(
"工人"
+
this
.num+
"釋放出機器"
);
semaphore.release();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
執行結果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
工人
0
佔用一個機器在生產...
工人
1
佔用一個機器在生產...
工人
2
佔用一個機器在生產...
工人
8
9
10
11
12
13
14
15
16
|
工人
0
佔用一個機器在生產...
工人
1
佔用一個機器在生產...
工人
2
佔用一個機器在生產...
工人
4
佔用一個機器在生產...
工人
5
佔用一個機器在生產...
工人
0
釋放出機器
8
9
10
11
12
13
14
15
16
|
工人
0
佔用一個機器在生產...
工人
1
佔用一個機器在生產...
工人
2
佔用一個機器在生產...
工人
4
佔用一個機器在生產...
工人
5
佔用一個機器在生產...
工人
0
釋放出機器
工人
2
釋放出機器
工人
3
佔用一個機器在生產...
|