譯序html
本指南根據 Jakob Jenkov 最新博客翻譯,請隨時關注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。java
本指南已作成中英文對照閱讀版的 pdf 文檔,有興趣的朋友能夠去 Java併發工具包java.util.concurrent用戶指南中英文對照閱讀版.pdf[帶書籤] 進行下載。算法
Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列可以讓 Java 的併發編程變得更加簡單輕鬆的類。在這個包被添加之前,你須要本身去動手實現本身的相關工具類。編程
本文我將帶你一一認識 java.util.concurrent 包裏的這些類,而後你能夠嘗試着如何在項目中使用它們。本文中我將使用 Java 6 版本,我不肯定這和 Java 5 版本里的是否有一些差別。我不會去解釋關於 Java 併發的核心問題 – 其背後的原理,也就是說,若是你對那些東西感興趣,參考《Java 併發指南》。數組
半成品數據結構
本文很大程度上仍是個 「半成品」,因此當你發現一些被漏掉的類或接口時,請耐心等待。在我空閒的時候會把它們加進來的。併發
java.util.concurrent 包裏的 BlockingQueue 接口表示一個線程安放入和提取實例的隊列。本小節我將給你演示如何使用這個 BlockingQueue。本節不會討論如何在 Java 中實現一個你本身的 BlockingQueue。若是你對那個感興趣,參考《Java 併發指南》異步
BlockingQueue 一般用於一個線程生產對象,而另一個線程消費這些對象的場景。下圖是對這個原理的闡述:ide
一個線程往裏邊放,另一個線程從裏邊取的一個 BlockingQueue。工具
一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是說,它是有限的。若是該阻塞隊列到達了其臨界點,負責生產的線程將會在往裏邊插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。負責消費的線程將會一直從該阻塞隊列中拿出對象。若是消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。
BlockingQueue 具備 4 組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:
四組不一樣的行爲方式解釋:
拋異常:若是試圖的操做沒法當即執行,拋一個異常。
特定值:若是試圖的操做沒法當即執行,返回一個特定的值(經常是 true / false)。
阻塞:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行。
超時:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定值以告知該操做是否成功(典型的是 true / false)。
沒法向一個 BlockingQueue 中插入 null。若是你試圖插入 null,BlockingQueue 將會拋出一個 NullPointerException。
能夠訪問到 BlockingQueue 中的全部元素,而不只僅是開始和結束的元素。好比說,你將一個對象放入隊列之中以等待處理,但你的應用想要將其取消掉。那麼你能夠調用諸如 remove(o) 方法來將隊列之中的特定對象進行移除。可是這麼幹效率並不高(譯者注:基於隊列的數據結構,獲取除開始或結束位置的其餘對象的效率不會過高),所以你儘可能不要用這一類的方法,除非你確實不得不那麼作。
BlockingQueue 是個接口,你須要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具備如下 BlockingQueue 接口的實現(Java 6):
這裏是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 實現。
首先,BlockingQueueExample 類分別在兩個獨立的線程中啓動了一個 Producer 和 一個 Consumer。
Producer 向一個共享的 BlockingQueue 中注入字符串,而 Consumer 則會從中把它們拿出來。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public
class
BlockingQueueExample {
public
static
void
main(String[] args)
throws
Exception {
BlockingQueue queue =
new
ArrayBlockingQueue(
1024
);
Producer producer =
new
Producer(queue);
Consumer consumer =
new
Consumer(queue);
new
Thread(producer).start();
new
Thread(consumer).start();
Thread.sleep(
4000
);
}
}
|
如下是 Producer 類。注意它在每次 put() 調用時是如何休眠一秒鐘的。這將致使 Consumer 在等待隊列中對象的時候發生阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public
class
Producer
implements
Runnable{
protected
BlockingQueue queue =
null
;
public
Producer(BlockingQueue queue) {
this
.queue = queue;
}
public
void
run() {
try
{
queue.put(
"1"
);
Thread.sleep(
1000
);
queue.put(
"2"
);
Thread.sleep(
1000
);
queue.put(
"3"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
如下是 Consumer 類。它只是把對象從隊列中抽取出來,而後將它們打印到 System.out。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
class
Consumer
implements
Runnable{
protected
BlockingQueue queue =
null
;
public
Consumer(BlockingQueue queue) {
this
.queue = queue;
}
public
void
run() {
try
{
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
ArrayBlockingQueue 類實現了 BlockingQueue 接口。
ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了(譯者注:由於它是基於數組實現的,也就具備數組的特性:一旦初始化,大小就沒法修改)。
‘ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。
如下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:
1
2
3
4
5
|
BlockingQueue queue =
new
ArrayBlockingQueue(
1024
);
queue.put(
"1"
);
Object object = queue.take();
|
如下是使用了 Java 泛型的一個 BlockingQueue 示例。注意其中是如何對 String 元素放入和提取的:
1
2
3
4
5
|
BlockingQueue<String> queue =
new
ArrayBlockingQueue<String>(
1024
);
queue.put(
"1"
);
String string = queue.take();
|
DelayQueue 實現了 BlockingQueue 接口。DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口,該接口定義:
1
2
3
4
5
|
public
interface
Delayed
extends
Comparable<Delayed< {
public
long
getDelay(TimeUnit timeUnit);
}
|
DelayQueue 將會在每一個元素的 getDelay() 方法返回的值的時間段以後才釋放掉該元素。若是返回的是 0 或者負值,延遲將被認爲過時,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。傳遞給 getDelay 方法的 getDelay 實例是一個枚舉類型,它代表了將要延遲的時間段。
TimeUnit 枚舉將會取如下值:
1
2
3
4
5
6
7
8
|
DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS
`
|
正如你所看到的,Delayed 接口也繼承了 java.lang.Comparable 接口,這也就意味着 Delayed 對象之間能夠進行對比。這個可能在對 DelayQueue 隊列中的元素進行排序時有用,所以它們能夠根據過時時間進行有序釋放。如下是使用 DelayQueue 的例子:
1
2
3
4
5
6
7
8
9
10
11
12
|
public
class
DelayQueueExample {
public
static
void
main(String[] args) {
DelayQueue queue =
new
DelayQueue();
Delayed element1 =
new
DelayedElement();
queue.put(element1);
Delayed element2 = queue.take();
}
}
|
DelayedElement 是我所建立的一個 DelayedElement 接口的實現類,它不在 Java.util.concurrent 包裏。你須要自行建立你本身的 Delayed 接口的實現以使用 DelayQueue 類。
LinkedBlockingQueue 類實現了 BlockingQueue 接口。
LinkedBlockingQueue 內部以一個鏈式結構(連接節點)對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上限。若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限。
LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。
如下是 LinkedBlockingQueue 的初始化和使用示例代碼:
1
2
3
4
5
6
|
BlockingQueue<String> unbounded =
new
LinkedBlockingQueue<String>();
BlockingQueue<String> bounded =
new
LinkedBlockingQueue<String>(
1024
);
bounded.put(
"Value"
);
String value = bounded.take();
|
PriorityBlockingQueue 類實現了 BlockingQueue 接口。
PriorityBlockingQueue 是一個無界的併發隊列。它使用了和類 java.util.PriorityQueue 同樣的排序規則。你沒法向這個隊列中插入 null 值。全部插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。所以該隊列中元素的排序就取決於你本身的 Comparable 實現。注意 PriorityBlockingQueue 對於具備相等優先級(compare() == 0)的元素並不強制任何特定行爲。
同時注意,若是你從一個 PriorityBlockingQueue 得到一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先級爲序的。
如下是使用 PriorityBlockingQueue 的示例:
1
2
3
4
5
6
|
BlockingQueue queue =
new
PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put(
"Value"
);
String value = queue.take();
|
SynchronousQueue 類實現了 BlockingQueue 接口。
SynchronousQueue 是一個特殊的隊列,它的內部同時只可以容納單個元素。若是該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。
據此,把這個類稱做一個隊列顯然是誇大其詞了。它更多像是一個匯合點。
java.util.concurrent 包裏的 BlockingDeque 接口表示一個線程安放入和提取實例的雙端隊列。本小節我將給你演示如何使用 BlockingDeque。BlockingDeque 類是一個雙端隊列,在不可以插入元素時,它將阻塞住試圖插入元素的線程;在不可以抽取元素時,它將阻塞住試圖抽取的線程。deque(雙端隊列) 是 「Double Ended Queue」 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列。
在線程既是一個隊列的生產者又是這個隊列的消費者的時候可使用到 BlockingDeque。若是生產者線程須要在隊列的兩端均可以插入數據,消費者線程須要在隊列的兩端均可以移除數據,這個時候也可使用 BlockingDeque。
一個 BlockingDeque – 線程在雙端隊列的兩端均可以插入和提取元素。
一個線程生產元素,並把它們插入到隊列的任意一端。若是雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。若是雙端隊列爲空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。
BlockingDeque 具備 4 組不一樣的方法用於插入、移除以及對雙端隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:
四組不一樣的行爲方式解釋:
拋異常:若是試圖的操做沒法當即執行,拋一個異常。
特定值:若是試圖的操做沒法當即執行,返回一個特定的值(經常是 true / false)。
阻塞:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行。
超時:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定值以告知該操做是否成功(典型的是 true / false)。
BlockingDeque 接口繼承自 BlockingQueue 接口。
這就意味着你能夠像使用一個 BlockingQueue 那樣使用 BlockingDeque。若是你這麼幹的話,各類插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法同樣。
如下是 BlockingDeque 對 BlockingQueue 接口的方法的具體內部實現:
既然 BlockingDeque 是一個接口,那麼你想要使用它的話就得使用它的衆多的實現類的其中一個。java.util.concurrent 包提供瞭如下 BlockingDeque 接口的實現類:
如下是如何使用 BlockingDeque 方法的一個簡短代碼示例:
1
2
3
4
5
6
7
|
BlockingDeque<String> deque =
new
LinkedBlockingDeque<String>();
deque.addFirst(
"1"
);
deque.addLast(
"2"
);
String two = deque.takeLast();
String one = deque.takeFirst();
|
LinkedBlockingDeque 類實現了 BlockingDeque 接口。
deque(雙端隊列) 是 「Double Ended Queue」 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列。(譯者注:唐僧啊,受不了。)LinkedBlockingDeque 是一個雙端隊列,在它爲空的時候,一個試圖從中抽取數據的線程將會阻塞,不管該線程是試圖從哪一端抽取數據。如下是 LinkedBlockingDeque 實例化以及使用的示例:
1
2
3
4
5
6
7
|
BlockingDeque<String> deque =
new
LinkedBlockingDeque<String>();
deque.addFirst(
"1"
);
deque.addLast(
"2"
);
String two = deque.takeLast();
String one = deque.takeFirst();
|
java.util.concurrent.ConcurrentMap 接口表示了一個可以對別人的訪問(插入和提取)進行併發處理的 java.util.Map。ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法以外還有一些額外的原子性方法。
既然 ConcurrentMap 是個接口,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具有 ConcurrentMap 接口的如下實現類:
ConcurrentHashMap 和 java.util.HashTable 類很類似,但 ConcurrentHashMap 可以提供比 HashTable 更好的併發性能。在你從中讀取對象的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。
此外,在你向其中寫入對象的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。
另一個不一樣點是,在被遍歷的時候,即便是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。儘管 Iterator 的設計不是爲多個線程的同時使用。更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方文檔。
如下是如何使用 ConcurrentMap 接口的一個例子。
本示例使用了 ConcurrentHashMap 實現類:
1
2
3
4
5
|
ConcurrentMap concurrentMap =
new
ConcurrentHashMap();
concurrentMap.put(
"key"
,
"value"
);
Object value = concurrentMap.get(
"key"
);
|
java.util.concurrent.ConcurrentNavigableMap 是一個支持併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具有併發訪問的能力。所謂的 「子 map」 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。
NavigableMap 中的方法再也不贅述,本小節咱們來看一下 ConcurrentNavigableMap 添加的方法。
headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。若是你對原始 map 裏的元素作了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。如下示例演示了對 headMap() 方法的使用:
1
2
3
4
5
6
7
|
ConcurrentNavigableMap map =
new
ConcurrentSkipListMap();
map.put(
"1"
,
"one"
);
map.put(
"2"
,
"two"
);
map.put(
"3"
,
"three"
);
ConcurrentNavigableMap headMap = map.headMap(
"2"
);
|
headMap 將指向一個只含有鍵 「1″ 的 ConcurrentNavigableMap,由於只有這一個鍵小於 「2″。關於這個方法及其重載版本具體是怎麼工做的細節請參考 Java 文檔。
tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。
若是你對原始 map 裏的元素作了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
如下示例演示了對 tailMap() 方法的使用:
1
2
3
4
5
6
7
|
ConcurrentNavigableMap map =
new
ConcurrentSkipListMap();
map.put(
"1"
,
"one"
);
map.put(
"2"
,
"two"
);
map.put(
"3"
,
"three"
);
ConcurrentNavigableMap tailMap = map.tailMap(
"2"
);
|
tailMap 將擁有鍵 「2″ 和 「3″,由於它們不小於給定鍵 「2″。關於這個方法及其重載版本具體是怎麼工做的細節請參考 Java 文檔。
subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。
示例以下:
1
2
3
4
5
6
7
|
ConcurrentNavigableMap map =
new
ConcurrentSkipListMap();
map.put(
"1"
,
"one"
);
map.put(
"2"
,
"two"
);
map.put(
"3"
,
"three"
);
ConcurrentNavigableMap subMap = map.subMap(
"2"
,
"3"
);
|
返回的 submap 只包含鍵 「2″,由於只有它知足不小於 「2″,比 「3″ 小。
ConcurrentNavigableMap 接口還有其餘一些方法可供使用,
好比:
關於這些方法更多信息參考官方 Java 文檔。
java.util.concurrent.CountDownLatch 是一個併發構造,它容許一個或多個線程等待一系列指定操做的完成。
CountDownLatch 以一個給定的數量初始化。countDown() 每被調用一次,這一數量就減一。經過調用 await() 方法之一,線程能夠阻塞等待這一數量到達零。如下是一個簡單示例。
Decrementer 三次調用 countDown() 以後,等待中的 Waiter 纔會從 await() 調用中釋放出來。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
CountDownLatch latch =
new
CountDownLatch(
3
);
Waiter waiter =
new
Waiter(latch);
Decrementer decrementer =
new
Decrementer(latch);
new
Thread(waiter) .start();
new
Thread(decrementer).start();
Thread.sleep(
4000
);
public
class
Waiter
implements
Runnable{
CountDownLatch latch =
null
;
public
Waiter(CountDownLatch latch) {
this
.latch = latch;
}
public
void
run() {
try
{
latch.await();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"Waiter Released"
);
}
}
public
class
Decrementer
implements
Runnable {
CountDownLatch latch =
null
;
public
Decrementer(CountDownLatch latch) {
this
.latch = latch;
}
public
void
run() {
try
{
Thread.sleep(
1000
);
this
.latch.countDown();
Thread.sleep(
1000
);
this
.latch.countDown();
Thread.sleep(
1000
);
this
.latch.countDown();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
java.util.concurrent.CyclicBarrier 類是一種同步機制,它可以對處理一些算法的線程實現同步。換句話講,它就是一個全部線程必須等待的一個柵欄,直到全部線程都到達這裏,而後全部線程才能夠繼續作其餘事情。
圖示以下:
兩個線程在柵欄旁等待對方。
經過調用 CyclicBarrier 對象的 await() 方法,兩個線程能夠實現互相等待。一旦 N 個線程在等待 CyclicBarrier 達成,全部線程將被釋放掉去繼續運行。
在建立一個 CyclicBarrier 的時候你須要定義有多少線程在被釋放以前等待柵欄。
建立 CyclicBarrier 示例:
1
|
CyclicBarrier barrier =
new
CyclicBarrier(
2
);
|
如下演示瞭如何讓一個線程等待一個 CyclicBarrier:
1
|
barrier.await();
|
固然,你也能夠爲等待線程設定一個超時時間。等待超過了超時時間以後,即使尚未達成 N 個線程等待 CyclicBarrier 的條件,該線程也會被釋放出來。如下是定義超時時間示例:
1
|
barrier.await(
10
, TimeUnit.SECONDS);
|
知足如下任何條件均可以讓等待 CyclicBarrier 的線程釋放:
CyclicBarrier 支持一個柵欄行動,柵欄行動是一個 Runnable 實例,一旦最後等待柵欄的線程抵達,該實例將被執行。你能夠在 CyclicBarrier 的構造方法中將 Runnable 柵欄行動傳給它:
1
2
|
Runnable barrierAction = ... ;
CyclicBarrier barrier =
new
CyclicBarrier(
2
, barrierAction);
|
如下代碼演示瞭如何使用 CyclicBarrier:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Runnable barrier1Action =
new
Runnable() {
public
void
run() {
System.out.println(
"BarrierAction 1 executed "
);
}
};
Runnable barrier2Action =
new
Runnable() {
public
void
run() {
System.out.println(
"BarrierAction 2 executed "
);
}
};
CyclicBarrier barrier1 =
new
CyclicBarrier(
2
, barrier1Action);
CyclicBarrier barrier2 =
new
CyclicBarrier(
2
, barrier2Action);
CyclicBarrierRunnable barrierRunnable1 =
new
CyclicBarrierRunnable(barrier1, barrier2);
CyclicBarrierRunnable barrierRunnable2 =
new
CyclicBarrierRunnable(barrier1, barrier2);
new
Thread(barrierRunnable1).start();
new
Thread(barrierRunnable2).start();
|
CyclicBarrierRunnable 類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public
class
CyclicBarrierRunnable
implements
Runnable{
CyclicBarrier barrier1 =
null
;
CyclicBarrier barrier2 =
null
;
public
CyclicBarrierRunnable(
CyclicBarrier barrier1,
CyclicBarrier barrier2) {
this
.barrier1 = barrier1;
this
.barrier2 = barrier2;
}
public
void
run() {
try
{
Thread.sleep(
1000
);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 1"
);
this
.barrier1.await();
Thread.sleep(
1000
);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 2"
);
this
.barrier2.await();
System.out.println(Thread.currentThread().getName() +
" done!"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(BrokenBarrierException e) {
e.printStackTrace();
}
}
}
|
以上代碼控制檯輸出以下。注意每一個線程寫入控制檯的時序可能會跟你實際執行不同。好比有時 Thread-0 先打印,有時 Thread-1 先打印。
1
2
|