工做中每每會遇到異步去執行某段邏輯, 而後先處理其餘事情, 處理完後再把那段邏輯的處理結果進行彙總的產景, 這時候就須要使用線程了.
一個線程啓動以後, 是異步的去執行須要執行的內容的, 不會影響主線程的流程, 每每須要讓主線程指定後, 等待子線程的完成. 這裏有幾種方式.
站在 主線程的角度, 咱們能夠分爲主動式和被動式.
主動式指主線主動去檢測某個標誌位, 判斷子線程是否已經完成. 被動式指主線程被動的等待子線程的結束, 很明顯, 比較符合人們的胃口. 就是你事情作完了, 你告訴我, 我彙總一下, 哈哈.
那麼主線程如何等待子線程工做完成呢. 很簡單, Thread 類給咱們提供了join 系列的方法, 這些方法的目的就是等待當前線程的die. 舉個例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
</div>
public class Threads {
public static void main(String[] args) {
SubThread thread = new SubThread();
thread.start();
//主線程處理其餘工做,讓子線程異步去執行.
mainThreadOtherWork();
System.out.println( "now waiting sub thread done." );
//主線程其餘工做完畢,等待子線程的結束, 調用join系列的方法便可(能夠設置超時時間)
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "now all done." );
}
private static void mainThreadOtherWork() {
System.out.println( "main thread work start" );
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "main thread work done." );
}
public static class SubThread extends Thread{
@Override
public void run() {
working();
}
private void working() {
System.out.println( "sub thread start working." );
busy();
System.out.println( "sub thread stop working." );
}
private void busy() {
try {
sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
本程序的數據有多是以下:
- main thread work start
- sub thread start working.
- main thread work done.
- now waiting sub thread done.
- sub thread stop working.
- now all done.
忽略標號, 固然輸出也有多是1和2調換位置了. 這個咱們是沒法控制的. 咱們看下線程的join操做, 究竟幹了什麼.
1
2
3
|
public final void join() throws InterruptedException {
join( 0 );
}
|
這裏是調用了
public final synchronized void join(long millis)
方法, 參數爲0, 表示沒有超時時間, 等到線程結束爲止. join(millis)方法裏面有這麼一段代碼:
1
2
3
|
while (isAlive()) {
wait( 0 );
}
|
說明, 當線程處於活躍狀態的時候, 會一直等待, 直到這裏的isAlive方法返回false, 纔會結束.isAlive方法是一個本地方法, 他的做用是判斷線程是否已經執行結束. 註釋是這麼寫的:
Tests if this thread is alive. A thread is alive if it has been started and has not yet died.java
可見, join系列方法能夠幫助咱們等待一個子線程的結束.
那麼要問, 有沒有另一種方法能夠等待子線程結束? 固然有的, 咱們可使用併發包下面的Future模式.
Future是一個任務執行的結果, 他是一個未來時, 即一個任務執行, 當即異步返回一個Future對象, 等到任務結束的時候, 會把值返回給這個future對象裏面. 咱們可使用ExecutorService接口來提交一個線程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public class Threads {
static ExecutorService executorService = Executors.newFixedThreadPool( 1 );
@SuppressWarnings ( "rawtypes" )
public static void main(String[] args) throws InterruptedException, ExecutionException {
SubThread thread = new SubThread();
// thread.start();
Future future = executorService.submit(thread);
mainThreadOtherWork();
System.out.println( "now waiting sub thread done." );
future.get();
// try {
// thread.join();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println( "now all done." );
executorService.shutdown();
}
private static void mainThreadOtherWork() {
System.out.println( "main thread work start" );
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "main thread work done." );
}
public static class SubThread extends Thread{
@Override
public void run() {
working();
}
private void working() {
System.out.println( "sub thread start working." );
busy();
System.out.println( "sub thread stop working." );
}
private void busy() {
try {
sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
這 裏, ThreadPoolExecutor 是實現了 ExecutorService的方法, sumbit的過程就是把一個Runnable接口對象包裝成一個 Callable接口對象, 而後放到 workQueue裏等待調度執行. 固然, 執行的啓動也是調用了thread的start來作到的, 只不過這裏被包裝掉了. 另外, 這裏的thread是會被重複利用的, 因此這裏要退出主線程, 須要執行如下shutdown方法以示退出使用線程池. 扯遠了.
這 種方法是得益於Callable接口和Future模式, 調用future接口的get方法, 會同步等待該future執行結束, 而後獲取到結果. Callbale接口的接口方法是 V call(); 是能夠有返回結果的, 而Runnable的 void run(), 是沒有返回結果的. 因此, 這裏即便被包裝成Callbale接口, future.get返回的結果也是null的.若是須要獲得返回結果, 建議使用Callable接口.
經過隊列來控制線程的進度, 是很好的一個理念. 咱們徹底能夠本身搞個隊列, 本身控制. 這樣也能夠實現. 不信看代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
public class Threads {
// static ExecutorService executorService = Executors.newFixedThreadPool(1);
static final BlockingQueue queue = new ArrayBlockingQueue( 1 );
public static void main(String[] args) throws InterruptedException, ExecutionException {
SubThread thread = new SubThread(queue);
thread.start();
// Future future = executorService.submit(thread);
mainThreadOtherWork();
System.out.println( "now waiting sub thread done." );
// future.get();
queue.take();
// try {
// thread.join();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println( "now all done." );
// executorService.shutdown();
}
private static void mainThreadOtherWork() {
System.out.println( "main thread work start" );
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "main thread work done." );
}
public static class SubThread extends Thread{
private BlockingQueue queue;
/**
* @param queue
*/
public SubThread(BlockingQueue queue) {
this .queue = queue;
}
@Override
public void run() {
try {
working();
} finally {
try {
queue.put( 1 );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void working() {
System.out.println( "sub thread start working." );
busy();
System.out.println( "sub thread stop working." );
}
private void busy() {
try {
sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
這 裏是得益於咱們用了一個阻塞隊列, 他的put操做和take操做都會阻塞(同步), 在知足條件的狀況下.當咱們調用take()方法時, 因爲子線程還沒結束, 隊列是空的, 因此這裏的take操做會阻塞, 直到子線程結束的時候, 往隊列裏面put了個元素, 代表本身結束了. 這時候主線程的take()就會返回他拿到的數據. 固然, 他拿到什麼咱們是沒必要去關心的.
以上幾種狀況都是針對子線程只有1個的時候. 當子線程有多個的時候, 狀況就不妙了.
第一種方法, 你要調用不少個線程的join, 特別是當你的線程不是for循環建立的, 而是一個一個建立的時候.
第二種方法, 要調用不少的future的get方法, 同第一種方法.
第三種方法, 比較方便一些, 只須要每一個線程都在queue裏面 put一個元素就行了.可是, 第三種方法, 這個隊列裏的對象, 對咱們是毫無用處, 咱們爲了使用隊列, 而要不明不白浪費一些內存,
那有沒有更好的辦法呢?有的, concurrency包裏面提供了好多有用的東東, 其中, CountDownLanch就是咱們要用的.
CountDownLanch 是一個倒數計數器, 給一個初始值(>=0), 而後每countDown一次就會減1, 這很符合等待多個子線程結束的場景: 一個線程結束的時候, countDown一次, 直到全部都countDown了 , 那麼全部子線程就都結束了.
先看看CountDownLanch有哪些方法:
await: 會阻塞等待計數器減小到0位置. 帶參數的await是多了等待時間.
countDown: 將當前的技術減1
getCount(): 返回當前的計數
顯而易見, 咱們只須要在子線程執行以前, 賦予初始化countDownLanch, 並賦予線程數量爲初始值.
每一個線程執行完畢的時候, 就countDown一下.主線程只須要調用await方法, 能夠等待全部子線程執行結束, 看代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
public class Threads {
// static ExecutorService executorService = Executors.newFixedThreadPool(1);
static final BlockingQueue queue = new ArrayBlockingQueue( 1 );
public static void main(String[] args) throws InterruptedException, ExecutionException {
int threads = 5 ;
CountDownLatch countDownLatch = new CountDownLatch(threads);
for ( int i= 0 ;i<threads;i++){
SubThread thread = new SubThread( 2000 *(i+ 1 ), countDownLatch);
thread.start();
}
// Future future = executorService.submit(thread);
mainThreadOtherWork();
System.out.println( "now waiting sub thread done." );
// future.get();
// queue.take();
countDownLatch.await();
// try {
// thread.join();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println( "now all done." );
// executorService.shutdown();
}
private static void mainThreadOtherWork() {
System.out.println( "main thread work start" );
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "main thread work done." );
}
public static class SubThread extends Thread{
// private BlockingQueue queue;
private CountDownLatch countDownLatch;
private long work;
/**
* @param queue
*/
// public SubThread(BlockingQueue queue) {
// this.queue = queue;
// this.work = 5000L;
// }
public SubThread( long work, CountDownLatch countDownLatch) {
// this.queue = queue;
this .countDownLatch = countDownLatch;
this .work = work;
}
@Override
public void run() {
try {
working();
} finally {
// try {
// queue.put(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
countDownLatch.countDown();
}
}
private void working() {
System.out.println(getName()+ " sub thread start working." );
busy();
System.out.println(getName()+ " sub thread stop working." );
}
private void busy() {
try {
sleep(work);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
|
此種方法也適用於使用 ExecutorService summit 的任務的執行.
另外還有一個併發包的類CyclicBarrier, 這個是(子)線程之間的互相等待的利器. 柵欄, 就是把你們都在一個地方堵住, 就像水閘, 等你們都完成了以前的操做, 在一塊兒繼續下面的操做.
不過就再也不本篇的討論範圍內了.
EO