本文只要是對java.util.concurrent包下的相關開發工具作一個簡單的介紹,引導各位認識在這個包下的類,並嘗試在項目中使用它,html
本文不會去解釋關於 Java 併發的核心問題 - 其背後的原理,也就是說,若是你對那些東西感興趣,請參考《Java 併發指南》。java
當你發現一些被漏掉的類或接口時,請耐心等待。在做者空閒的時候會把它們加進來的。算法
[TOC]數組
java.util.concurrent 包裏的 BlockingQueue 接口表示一個線程放入和提取實例的隊列。本小節我將給你演示如何使用這個 BlockingQueue。bash
BlockingQueue 一般用於一個線程生產對象,而另一個線程消費這些對象的場景。下圖是對這個原理的闡述:服務器
一個線程往裏邊放,另一個線程從裏邊取的一個 BlockingQueue。網絡
一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是說,它是有限的。若是該阻塞隊列到達了其臨界點,負責生產的線程將會在往裏邊插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。數據結構
負責消費的線程將會一直從該阻塞隊列中拿出對象。若是消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。併發
BlockingQueue 具備 4 組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:異步
操做 | 拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
檢查 | element(o) | peek(o) | 不可用 | 不可用 |
四組不一樣的行爲方式解釋:
沒法向一個 BlockingQueue 中插入 null。若是你試圖插入 null,BlockingQueue 將會拋出一個 NullPointerException。 能夠訪問到 BlockingQueue 中的全部元素,而不只僅是開始和結束的元素。好比說,你將一個對象放入隊列之中以等待處理,但你的應用想要將其取消掉。那麼你能夠調用諸如 remove(o) 方法來將隊列之中的特定對象進行移除。可是這麼幹效率並不高(譯者注:基於隊列的數據結構,獲取除開始或結束位置的其餘對象的效率不會過高),所以你儘可能不要用這一類的方法,除非你確實不得不那麼作。
BlockingQueue 是個接口,你須要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具備如下 BlockingQueue 接口的實現(Java 6):
這裏是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 實現。
首先,BlockingQueueExample 類分別在兩個獨立的線程中啓動了一個 Producer 和 一個 Consumer。Producer 向一個共享的 BlockingQueue 中注入字符串,而 Consumer 則會從中把它們拿出來。
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
} 123456789101112131415
複製代碼
如下是 Producer 類。注意它在每次 put() 調用時是如何休眠一秒鐘的。這將致使 Consumer 在等待隊列中對象的時候發生阻塞。
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} 1234567891011121314151617181920
複製代碼
如下是 Consumer 類。它只是把對象從隊列中抽取出來,而後將它們打印到 System.out
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} 123456789101112131415161718
複製代碼
ArrayBlockingQueue 類實現了 BlockingQueue 接口。
ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了(譯者注:由於它是基於數組實現的,也就具備數組的特性:一旦初始化,大小就沒法修改)。
ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。
如下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take(); 123
複製代碼
如下是使用了 Java 泛型的一個 BlockingQueue 示例。注意其中是如何對 String 元素放入和提取的:
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take(); 123
複製代碼
DelayQueue 實現了 BlockingQueue 接口。
DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口,該接口定義:
public interface Delayed extends Comparable<Delayed> {
public long getDelay(TimeUnit timeUnit);
} 12345
複製代碼
DelayQueue 將會在每一個元素的 getDelay() 方法返回的值的時間段以後才釋放掉該元素。若是返回的是 0 或者負值,延遲將被認爲過時,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。
傳遞給 getDelay 方法的 getDelay 實例是一個枚舉類型,它代表了將要延遲的時間段。TimeUnit 枚舉將會取如下值:
正如你所看到的,Delayed 接口也繼承了 java.lang.Comparable 接口,這也就意味着 Delayed 對象之間能夠進行對比。這個可能在對 DelayQueue 隊列中的元素進行排序時有用,所以它們能夠根據過時時間進行有序釋放。
如下是使用 DelayQueue 的例子:
public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue queue = new DelayQueue();
Delayed element1 = new DelayedElement();
queue.put(element1);
Delayed element2 = queue.take();
}
} 123456789
複製代碼
DelayedElement 是我所建立的一個 DelayedElement 接口的實現類,它不在 Java.util.concurrent 包裏。你須要自行建立你本身的 Delayed 接口的實現以使用 DelayQueue 類。
LinkedBlockingQueue 類實現了 BlockingQueue 接口。
LinkedBlockingQueue 內部以一個鏈式結構(連接節點)對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上限。若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限。
LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。
如下是 LinkedBlockingQueue 的初始化和使用示例代碼:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);
bounded.put("Value");
String value = bounded.take(); 1234
複製代碼
PriorityBlockingQueue 類實現了 BlockingQueue 接口。
PriorityBlockingQueue 是一個無界的併發隊列。它使用了和類 java.util.PriorityQueue 同樣的排序規則。你沒法向這個隊列中插入 null 值。
全部插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。所以該隊列中元素的排序就取決於你本身的 Comparable 實現。
注意 PriorityBlockingQueue 對於具備相等優先級(compare() == 0)的元素並不強制任何特定行爲。 同時注意,若是你從一個 PriorityBlockingQueue 得到一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先級爲序的。
如下是使用 PriorityBlockingQueue 的示例:
BlockingQueue queue = new PriorityBlockingQueue();
//String implements java.lang.Comparable
queue.put("Value");
String value = queue.take(); 1234
複製代碼
SynchronousQueue 類實現了 BlockingQueue 接口。
SynchronousQueue 是一個特殊的隊列,它的內部同時只可以容納單個元素。若是該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。
據此,把這個類稱做一個隊列顯然是誇大其詞了。它更多像是一個匯合點。
java.util.concurrent 包裏的 BlockingDeque 接口表示一個線程安放入和提取實例的雙端隊列。本小節我將給你演示如何使用 BlockingDeque。
BlockingDeque 類是一個雙端隊列,在不可以插入元素時,它將阻塞住試圖插入元素的線程;在不可以抽取元素時,它將阻塞住試圖抽取的線程。
deque(雙端隊列) 是 「Double Ended Queue」 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列。
在線程既是一個隊列的生產者又是這個隊列的消費者的時候可使用到 BlockingDeque。若是生產者線程須要在隊列的兩端均可以插入數據,消費者線程須要在隊列的兩端均可以移除數據,這個時候也可使用 BlockingDeque。BlockingDeque 圖解:
一個 BlockingDeque - 線程在雙端隊列的兩端均可以插入和提取元素。
一個線程生產元素,並把它們插入到隊列的任意一端。若是雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。若是雙端隊列爲空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。
BlockingDeque 具備 4 組不一樣的方法用於插入、移除以及對雙端隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:
操做 | 拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
移除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
檢查 | getFirst(o) | peekFirst(o) | 無 | 無 |
操做 | 拋異常 | 特定值 | 阻塞 | 超時 |
---|---|---|---|---|
插入 | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
移除 | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
檢查 | getLast(o) | peekLast(o) | 無 | 無 |
四組不一樣的行爲方式解釋:
BlockingDeque 接口繼承自 BlockingQueue 接口。這就意味着你能夠像使用一個 BlockingQueue 那樣使用BlockingDeque。若是你這麼幹的話,各類插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法同樣。
如下是 BlockingDeque 對 BlockingQueue 接口的方法的具體內部實現:
BlockingQueue | BlockingDeque |
---|---|
add() | addLast() |
offer() | offerLast() |
put() | putLast() |
offer(e, time, unit) | offerLast(e, time, unit) |
remove() | removeFirst() |
poll() | pollFirst() |
take() | takeFirst() |
poll(time, unit) | pollLast(time, unit) |
element() | getFirst() |
peek() | peekFirst() |
既然 BlockingDeque 是一個接口,那麼你想要使用它的話就得使用它的衆多的實現類的其中一個。java.util.concurrent 包提供瞭如下 BlockingDeque 接口的實現類:LinkedBlockingDeque
如下是如何使用 BlockingDeque 方法的一個簡短代碼示例:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst(); 1234567
複製代碼
LinkedBlockingDeque 類實現了 BlockingDeque 接口。
deque(雙端隊列) 是 「Double Ended Queue」 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列。
LinkedBlockingDeque 是一個雙端隊列,在它爲空的時候,一個試圖從中抽取數據的線程將會阻塞,不管該線程是試圖從哪一端抽取數據。
如下是 LinkedBlockingDeque 實例化以及使用的示例:
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst(); 1234567
複製代碼
java.util.concurrent.ConcurrentMap 接口表示了一個可以對別人的訪問(插入和提取)進行併發處理的 java.util.Map。 ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法以外還有一些額外的原子性方法。
既然 ConcurrentMap 是個接口,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具有 ConcurrentMap 接口的如下實現類:ConcurrentHashMap
ConcurrentHashMap 和 java.util.HashTable 類很類似,但 ConcurrentHashMap 可以提供比 HashTable 更好的併發性能。在你從中讀取對象的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。此外,在你向其中寫入對象的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。
另一個不一樣點是,在被遍歷的時候,即便是 ConcurrentHashMap 被改動,它也不會拋ConcurrentModificationException。儘管 Iterator 的設計不是爲多個線程的同時使用。
更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方文檔。
如下是如何使用 ConcurrentMap 接口的一個例子。本示例使用了 ConcurrentHashMap 實現類:
ConcurrentMap concurrentMap = new ConcurrentHashMap();
concurrentMap.put("key", "value");
Object value = concurrentMap.get("key"); 12345
複製代碼
java.util.concurrent.ConcurrentNavigableMap 是一個支持併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具有併發訪問的能力。所謂的 「子 map」 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。
NavigableMap 中的方法再也不贅述,本小節咱們來看一下 ConcurrentNavigableMap 添加的方法。
headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。 若是你對原始 map 裏的元素作了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
如下示例演示了對 headMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap headMap = map.headMap("2"); 1234567
複製代碼
headMap 將指向一個只含有鍵 「1」 的 ConcurrentNavigableMap,由於只有這一個鍵小於 「2」。關於這個方法及其重載版本具體是怎麼工做的細節請參考 Java 文檔。
tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。 若是你對原始 map 裏的元素作了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
如下示例演示了對 tailMap() 方法的使用:
ConcurrentNavigableMap map = new ConcurrentSkipListMap();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap tailMap = map.tailMap("2"); 1234567
複製代碼
tailMap 將擁有鍵 「2」 和 「3」,由於它們不小於給定鍵 「2」。關於這個方法及其重載版本具體是怎麼工做的細節請參考 Java 文檔。
subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。示例以下:
ConcurrentNavigableMap map = new ConcurrentSkipListMap();
map.put("1", "one");
map.put("2", "two");
map.put("3", "three");
ConcurrentNavigableMap subMap = map.subMap("2", "3"); 1234567
複製代碼
返回的 submap 只包含鍵 「2」,由於只有它知足不小於 「2」,比 「3」 小。
ConcurrentNavigableMap 接口還有其餘一些方法可供使用,好比:
關於這些方法更多信息參考官方 Java 文檔。
java.util.concurrent.CountDownLatch 是一個併發構造,它容許一個或多個線程等待一系列指定操做的完成。 CountDownLatch 以一個給定的數量初始化。countDown() 每被調用一次,這一數量就減一。經過調用 await() 方法之一,線程能夠阻塞等待這一數量到達零。
如下是一個簡單示例。Decrementer 三次調用 countDown() 以後,等待中的 Waiter 纔會從 await() 調用中釋放出來。
CountDownLatch latch = new CountDownLatch(3);
Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);
new Thread(waiter).start();
new Thread(decrementer).start();
Thread.sleep(4000);
public class Waiter implements Runnable{
CountDownLatch latch = null;
public Waiter(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
public class Decrementer implements Runnable {
CountDownLatch latch = null;
public Decrementer(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
複製代碼
java.util.concurrent.CyclicBarrier 類是一種同步機制,它可以對處理一些算法的線程實現同步。換句話講,它就是一個全部線程必須等待的一個柵欄,直到全部線程都到達這裏,而後全部線程才能夠繼續作其餘事情。圖示以下:
兩個線程在柵欄旁等待對方。 經過調用 CyclicBarrier 對象的 await() 方法,兩個線程能夠實現互相等待。一旦 N 個線程在等待 CyclicBarrier 達成,全部線程將被釋放掉去繼續運行。
在建立一個 CyclicBarrier 的時候你須要定義有多少線程在被釋放以前等待柵欄。建立 CyclicBarrier 示例:
CyclicBarrier barrier = new CyclicBarrier(2); 1
複製代碼
如下演示瞭如何讓一個線程等待一個 CyclicBarrier:
barrier.await(); 1
複製代碼
固然,你也能夠爲等待線程設定一個超時時間。等待超過了超時時間以後,即使尚未達成 N 個線程等待 CyclicBarrier 的條件,該線程也會被釋放出來。如下是定義超時時間示例:
barrier.await(10, TimeUnit.SECONDS); 1
複製代碼
知足如下任何條件均可以讓等待 CyclicBarrier 的線程釋放:
CyclicBarrier 支持一個柵欄行動,柵欄行動是一個 Runnable 實例,一旦最後等待柵欄的線程抵達,該實例將被執行。你能夠在 CyclicBarrier 的構造方法中將 Runnable 柵欄行動傳給它:
Runnable barrierAction = ... ;
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction); 12
複製代碼
如下代碼演示瞭如何使用 CyclicBarrier:
Runnable barrier1Action = new Runnable() {
public void run() {
System.out.println("BarrierAction 1 executed ");
}
};
Runnable barrier2Action = new Runnable() {
public void run() {
System.out.println("BarrierAction 2 executed ");
}
};
CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);
CyclicBarrierRunnable barrierRunnable1 = new CyclicBarrierRunnable(barrier1, barrier2);
CyclicBarrierRunnable barrierRunnable2 = new CyclicBarrierRunnable(barrier1, barrier2);
new Thread(barrierRunnable1).start();
new Thread(barrierRunnable2).start(); 1234567891011121314151617181920
複製代碼
CyclicBarrierRunnable 類:
public class CyclicBarrierRunnable implements Runnable{
CyclicBarrier barrier1 = null;
CyclicBarrier barrier2 = null;
public CyclicBarrierRunnable( CyclicBarrier barrier1, CyclicBarrier barrier2) {
this.barrier1 = barrier1;
this.barrier2 = barrier2;
}
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 1");
this.barrier1.await();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 2");
this.barrier2.await();
System.out.println(Thread.currentThread().getName() +
" done!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
} 1234567891011121314151617181920212223242526272829303132333435
複製代碼
以上代碼控制檯輸出以下。注意每一個線程寫入控制檯的時序可能會跟你實際執行不同。好比有時 Thread-0 先打印,有時 Thread-1 先打印。
Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed
Thread-0 done!
Thread-1 done!12345678
複製代碼
java.util.concurrent.Exchanger 類表示一種兩個線程能夠進行互相交換對象的會和點。這種機制圖示以下:
兩個線程經過一個 Exchanger 交換對象。
交換對象的動做由 Exchanger 的兩個 exchange() 方法的其中一個完成。如下是一個示例:
Exchanger exchanger = new Exchanger();
ExchangerRunnable exchangerRunnable1 =
new ExchangerRunnable(exchanger, "A");
ExchangerRunnable exchangerRunnable2 =
new ExchangerRunnable(exchanger, "B");
new Thread(exchangerRunnable1).start();
new Thread(exchangerRunnable2).start();
ExchangerRunnable 代碼:
```java
public class ExchangerRunnable implements Runnable{
Exchanger exchanger = null;
Object object = null;
public ExchangerRunnable(Exchanger exchanger, Object object) {
this.exchanger = exchanger;
this.object = object;
}
public void run() {
try {
Object previous = this.object;
this.object = this.exchanger.exchange(this.object);
System.out.println(
Thread.currentThread().getName() +
" exchanged " + previous + " for " + this.object
);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} 12345678910111213141516171819202122232425262728293031323334353637
複製代碼
以上程序輸出:
Thread-0 exchanged A for B
Thread-1 exchanged B for A12
複製代碼
java.util.concurrent.Semaphore 類是一個計數信號量。這就意味着它具有兩個主要方法:
acquire()
release()12
複製代碼
計數信號量由一個指定數量的 「許可」 初始化。每調用一次 acquire(),一個許可會被調用線程取走。每調用一次 release(),一個許可會被返還給信號量。所以,在沒有任何 release() 調用時,最多有 N 個線程可以經過 acquire() 方法,N 是該信號量初始化時的許可的指定數量。這些許可只是一個簡單的計數器。這裏沒啥奇特的地方。
信號量主要有兩種用途:
若是你將信號量用於保護一個重要部分,試圖進入這一部分的代碼一般會首先嚐試得到一個許可,而後才能進入重要部分(代碼塊),執行完以後,再把許可釋放掉。好比這樣:
Semaphore semaphore = new Semaphore(1);
//critical section
semaphore.acquire();
...
semaphore.release(); 12345678
複製代碼
若是你將一個信號量用於在兩個線程之間傳送信號,一般你應該用一個線程調用 acquire() 方法,而另外一個線程調用 release() 方法。
若是沒有可用的許可,acquire() 調用將會阻塞,直到一個許可被另外一個線程釋放出來。同理,若是沒法往信號量釋放更多許可時,一個 release() 調用也會阻塞。
經過這個能夠對多個線程進行協調。好比,若是線程 1 將一個對象插入到了一個共享列表(list)以後以後調用了 acquire(),而線程 2 則在從該列表中獲取一個對象以前調用了 release(),這時你其實已經建立了一個阻塞隊列。信號量中可用的許可的數量也就等同於該阻塞隊列可以持有的元素個數。
沒有辦法保證線程可以公平地可從信號量中得到許可。也就是說,沒法擔保掉第一個調用 acquire() 的線程會是第一個得到一個許可的線程。若是第一個線程在等待一個許可時發生阻塞,而第二個線程前來索要一個許可的時候恰好有一個許可被釋放出來,那麼它就可能會在第一個線程以前得到許可。 若是你想要強制公平,Semaphore 類有一個具備一個布爾類型的參數的構造子,經過這個參數以告知 Semaphore 是否要強制公平。強制公平會影響到併發性能,因此除非你確實須要它不然不要啓用它。
如下是如何在公平模式建立一個 Semaphore 的示例:
Semaphore semaphore = new Semaphore(1, true); 1
複製代碼
java.util.concurrent.Semaphore 類還有不少方法,好比:
這些方法的細節請參考 Java 文檔。
java.util.concurrent.ExecutorService 接口表示一個異步執行機制,使咱們可以在後臺執行任務。所以一個 ExecutorService 很相似於一個線程池。實際上,存在於 java.util.concurrent 包裏的 ExecutorService 實現就是一個線程池實現。
如下是一個簡單的 ExecutorService 例子:
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown(); 123456789
複製代碼
首先使用 newFixedThreadPool() 工廠方法建立一個 ExecutorService。這裏建立了一個十個線程執行任務的線程池。 而後,將一個 Runnable 接口的匿名實現類傳遞給 execute() 方法。這將致使 ExecutorService 中的某個線程執行該 Runnable。
下圖說明了一個線程是如何將一個任務委託給一個 ExecutorService 去異步執行的:
一個線程將一個任務委派給一個 ExecutorService 去異步執行。 一旦該線程將任務委派給 ExecutorService,該線程將繼續它本身的執行,獨立於該任務的執行。
既然 ExecutorService 是個接口,若是你想用它的話就得去使用它的實現類之一。java.util.concurrent 包提供了 ExecutorService 接口的如下實現類:
ExecutorService 的建立依賴於你使用的具體實現。可是你也可使用 Executors 工廠類來建立 ExecutorService 實例。如下是幾個建立 ExecutorService 實例的例子:
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10); 123
複製代碼
有幾種不一樣的方式來將任務委託給 ExecutorService 去執行:
接下來咱們挨個看一下這些方法。
execute(Runnable) 方法要求一個 java.lang.Runnable 對象,而後對它進行異步執行。如下是使用 ExecutorService 執行一個 Runnable 的示例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown(); 123456789
複製代碼
沒有辦法得知被執行的 Runnable 的執行結果。若是有須要的話你得使用一個 Callable(如下將作介紹)。
submit(Runnable) 方法也要求一個 Runnable 實現類,但它返回一個 Future 對象。這個 Future 對象能夠用來檢查 Runnable 是否已經執行完畢。
如下是 ExecutorService submit() 示例:
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); //returns null if the task has finished correctly. 1234567
複製代碼
submit(Callable) 方法相似於 submit(Runnable) 方法,除了它所要求的參數類型以外。Callable 實例除了它的 call() 方法可以返回一個結果以外和一個 Runnable 很相像。Runnable.run() 不可以返回一個結果。 Callable 的結果能夠經過 submit(Callable) 方法返回的 Future 對象進行獲取。如下是一個 ExecutorService Callable 示例:
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get()); 12345678
複製代碼
以上代碼輸出:
Asynchronous Callable
future.get() = Callable Result12
複製代碼
invokeAny() 方法要求一系列的 Callable 或者其子接口的實例對象。調用這個方法並不會返回一個 Future,但它返回其中一個 Callable 對象的結果。沒法保證返回的是哪一個 Callable 的結果 - 只能代表其中一個已執行結束。
若是其中一個任務執行結束(或者拋了一個異常),其餘 Callable 將被取消。
如下是示例代碼:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown(); 12345678910111213141516171819202122232425
複製代碼
上述代碼將會打印出給定 Callable 集合中的一個的執行結果。我本身試着執行了它幾回,結果始終在變。有時是 「Task 1」,有時是 「Task 2」 等等。
invokeAll() 方法將調用你在集合中傳給 ExecutorService 的全部 Callable 對象。invokeAll() 返回一系列的 Future 對象,經過它們你能夠獲取每一個 Callable 的執行結果。
記住,一個任務可能會因爲一個異常而結束,所以它可能沒有 「成功」。沒法經過一個 Future 對象來告知咱們是兩種結束中的哪種。
如下是一個代碼示例:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
List<Future<String>> futures = executorService.invokeAll(callables);
for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}
executorService.shutdown(); 123456789101112131415161718192021222324252627
複製代碼
使用完 ExecutorService 以後你應該將其關閉,以使其中的線程再也不運行。
好比,若是你的應用是經過一個 main() 方法啓動的,以後 main 方法退出了你的應用,若是你的應用有一個活動的 ExexutorService 它將還會保持運行。ExecutorService 裏的活動線程阻止了 JVM 的關閉。
要終止 ExecutorService 裏的線程你須要調用 ExecutorService 的 shutdown() 方法。ExecutorService 並不會當即關閉,但它將再也不接受新的任務,並且一旦全部線程都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown() 被調用以前全部提交給 ExecutorService 的任務都被執行。
若是你想要當即關閉 ExecutorService,你能夠調用 shutdownNow() 方法。這樣會當即嘗試中止全部執行中的任務,並忽略掉那些已提交但還沒有開始處理的任務。沒法擔保執行任務的正確執行。可能它們被中止了,也可能已經執行結束。
Java Callable 接口 java.util.concurrent.Callable 表示能夠由單獨的線程執行的異步任務。例如,能夠將 Callable對象提交給 Java ExecutorService,而後 Java ExecutorService 將異步執行它。
Java Callable接口很是簡單。它包含一個名爲 call() 的方法。如下是 Callable 接口:
public interface Callable<V> {
V call() throws Exception;
}
複製代碼
調用 call() 方法以執行異步任務。call() 方法能夠返回結果。若是任務是異步執行的,則結果一般經過 Java Future傳播給任務的建立者。當一個 Callable 被提交給 ExecutorService 併發執行時就是可使用 Future對象接收返回結果 。
若是任務在執行期間失敗, call() 方法也能夠拋出 Exception。
這是一個實現 Java Callable 接口的簡單示例:
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return String.valueOf(System.currentTimeMillis());
}
}
複製代碼
這個實現很是簡單。結果是 call() 方法將返回一個當前時間的 String 。在實際應用程序中,任務多是更復雜或更大的操做集。
一般,IO操做(如讀取或寫入磁盤或網絡)是能夠同時執行的任務。IO操做一般在讀取和寫入數據塊之間有很長的等待時間。經過在單獨的線程中執行此類任務,能夠避免沒必要要地阻塞主線程。
Java Callable 接口相似於 Java Runnable 接口,由於它們都表示能夠由單獨的線程併發執行的任務。
一個Java Callable 是從不一樣 Runnable 在於該 Runnable 接口的 run() 方法不返回一個值,並且它不能拋出checked異常(僅 RuntimeException )。
此外,Runnable 最初設計用於長時間運行的併發任務,例如同時運行的網絡服務器,或者查看新文件的目錄。Callable 接口更適用於返回單個結果的一次性任務。
Java Future,java.util.concurrent.Future 表示異步計算的結果。建立異步任務時,將 Future 做爲線程返回的Java 對象。此 Future 對象用做異步任務結果的句柄。異步任務完成後,能夠經過 Future 對象訪問結果。
Java的一些內置併發服務(例如Java ExecutorService)從其某些方法返回 Future 對象。在這種狀況下ExecutorService 會在你提交併執行一個Callable任務時返回一個 Future 對象。
爲了瞭解Java Future 接口的工做原理,接口定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning) V get();
V get(long timeout, TimeUnit unit);
boolean isCancelled();
boolean isDone();
}
複製代碼
這些方法中的每個都將在後面的章節中介紹 - 但正如您所看到的,Java Future 接口並非那麼高級。
如前所述,Java Future 表示異步任務的結果。要得到結果,經過 Future 對象調用兩種 get() 方法中的一個。該get() 方法均返回一個 Object,但返回類型也能夠是通用的返回類型(指一個特定的類的對象,而不只僅是一個Object)。如下是 Future 經過其 get() 方法獲取結果的示例:
Future future = ... // get Future by starting async task
// do something else, until ready to check result via Future
// get result from Future
try {
Object result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
複製代碼
若是在異步任務完成以前調用 get() 方法,則該 get() 方法將阻塞,直到線程執行結束。
第二種 get() 方法能夠在通過一段時間後超時返回,您能夠經過方法參數指定超時時間。如下是調用該 get() 版本的示例:
try {
Object result =
future.get(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (TimeoutException e) {
// thrown if timeout time interval passes
// before a result is available.
}
複製代碼
上面的示例 Future 最多等待1000毫秒。若是在1000毫秒內沒有可用結果,則拋出 TimeoutException 異常。
您能夠經過調用 Future 的 cancel() 方法取消對應的的異步任務。該異步任務必須是已實現且正在執行的。若是沒有,調用 cancel() 將無效。如下是經過 Java Futurecancel() 方法取消任務的示例:
future.cancel();
複製代碼
您能夠經過調用 Future isDone() 方法來檢查異步任務是否完成(以及結果是否可用)。如下是調用Java Future isDone() 方法的示例:
Future future = ... // Get Future from somewhere
if(future.isDone()) {
Object result = future.get();
} else {
// do something else
}
複製代碼
還能夠檢查Java表示的異步任務是否被取消。你能夠經過調用 FutureisCancelled() 方法來檢查。如下是檢查任務是否已取消的示例:
Future future = ... // get Future from somewhere
if(future.isCancelled()) {
} else {
}
複製代碼
java.util.concurrent.ThreadPoolExecutor 是 ExecutorService 接口的一個實現。ThreadPoolExecutor 使用其內部池中的線程執行給定任務(Callable 或者 Runnable)。
ThreadPoolExecutor 包含的線程池可以包含不一樣數量的線程。池中線程的數量由如下變量決定:
當一個任務委託給線程池時,若是池中線程數量低於 corePoolSize,一個新的線程將被建立,即便池中可能尚有空閒線程。 若是內部任務隊列已滿,並且有至少 corePoolSize 正在運行,可是運行線程的數量低於 maximumPoolSize,一個新的線程將被建立去執行該任務。 ThreadPoolExecutor 圖解:
一個 ThreadPoolExecutor
ThreadPoolExecutor 有若干個可用構造子。好比:
int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
); 123456789101112
複製代碼
可是,除非你確實須要顯式爲 ThreadPoolExecutor 定義全部參數,使用 java.util.concurrent.Executors 類中的工廠方法之一會更加方便,正如 ExecutorService 小節所述。
java.util.concurrent.ScheduledExecutorService 是一個 ExecutorService, 它可以將任務延後執行,或者間隔固定時間屢次執行。 任務由一個工做者線程異步執行,而不是由提交任務給 ScheduledExecutorService 的那個線程執行。
如下是一個簡單的 ScheduledExecutorService 示例:
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5);
ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(new Callable() {
public Object call() throws Exception {
System.out.println("Executed!");
return "Called!";
}
},
5,
TimeUnit.SECONDS); 123456789101112
複製代碼
首先一個內置 5 個線程的 ScheduledExecutorService 被建立。以後一個 Callable 接口的匿名類示例被建立而後傳遞給 schedule() 方法。後邊的倆參數定義了 Callable 將在 5 秒鐘以後被執行。
既然 ScheduledExecutorService 是一個接口,你要用它的話就得使用 java.util.concurrent 包裏對它的某個實現類。ScheduledExecutorService 具備如下實現類:ScheduledThreadPoolExecutor
如何建立一個 ScheduledExecutorService 取決於你採用的它的實現類。可是你也可使用 Executors 工廠類來建立一個 ScheduledExecutorService 實例。好比:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); 1
複製代碼
一旦你建立了一個 ScheduledExecutorService,你能夠經過調用它的如下方法:
下面咱們就簡單看一下這些方法。
schedule (Callable task, long delay, TimeUnit timeunit)1
複製代碼
這個方法計劃指定的 Callable 在給定的延遲以後執行。 這個方法返回一個 ScheduledFuture,經過它你能夠在它被執行以前對它進行取消,或者在它執行以後獲取結果。 如下是一個示例:
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5);
ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(new Callable() {
public Object call() throws Exception {
System.out.println("Executed!");
return "Called!";
}
},
5,
TimeUnit.SECONDS);
System.out.println("result = " + scheduledFuture.get());
scheduledExecutorService.shutdown(); 12345678910111213141516
複製代碼
示例輸出結果:
Executed!
result = Called!
123
schedule (Runnable task, long delay, TimeUnit timeunit)1
複製代碼
除了 Runnable 沒法返回一個結果以外,這一方法工做起來就像以一個 Callable 做爲一個參數的那個版本的方法同樣,所以 ScheduledFuture.get() 在任務執行結束以後返回 null。
scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)1
複製代碼
這一方法規劃一個任務將被按期執行。該任務將會在首個 initialDelay 以後獲得執行,而後每一個 period 時間以後重複執行。
若是給定任務的執行拋出了異常,該任務將再也不執行。若是沒有任何異常的話,這個任務將會持續循環執行到 ScheduledExecutorService 被關閉。 若是一個任務佔用了比計劃的時間間隔更長的時候,下一次執行將在當前執行結束執行纔開始。計劃任務在同一時間不會有多個線程同時執行。
scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)1
複製代碼
除了 period 有不一樣的解釋以外這個方法和 scheduleAtFixedRate() 很是像。
scheduleAtFixedRate() 方法中,period 被解釋爲前一個執行的開始和下一個執行的開始之間的間隔時間。
而在本方法中,period 則被解釋爲前一個執行的結束和下一個執行的結束之間的間隔。所以這個延遲是執行結束之間的間隔,而不是執行開始之間的間隔。
正如 ExecutorService,在你使用結束以後你須要把 ScheduledExecutorService 關閉掉。不然他將致使 JVM 繼續運行,即便全部其餘線程已經全被關閉。
你可使用從 ExecutorService 接口繼承來的 shutdown() 或 shutdownNow() 方法將 ScheduledExecutorService 關閉。參見 ExecutorService 關閉部分以獲取更多信息。
ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很類似,除了一點不一樣。ForkJoinPool 讓咱們能夠很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務能夠繼續分割成更小的子任務,只要它還能分割。可能聽起來有些抽象,所以本節中咱們將會解釋 ForkJoinPool 是如何工做的,還有任務分割是如何進行的。
在咱們開始看 ForkJoinPool 以前咱們先來簡要解釋一下分叉和合並的原理。 分叉和合並原理包含兩個遞歸進行的步驟。兩個步驟分別是分叉步驟和合並步驟。
一個使用了分叉和合並原理的任務能夠將本身分叉(分割)爲更小的子任務,這些子任務能夠被併發執行。以下圖所示:
經過把本身分割成多個子任務,每一個子任務能夠由不一樣的 CPU 並行執行,或者被同一個 CPU 上的不一樣線程執行。
只有當給的任務過大,把它分割成幾個子任務纔有意義。把任務分割成子任務有必定開銷,所以對於小型任務,這個分割的消耗可能比每一個子任務併發執行的消耗還要大。
何時把一個任務分割成子任務是有意義的,這個界限也稱做一個閥值。這要看每一個任務對有意義閥值的決定。很大程度上取決於它要作的工做的種類。
當一個任務將本身分割成若干子任務以後,該任務將進入等待全部子任務的結束之中。
一旦子任務執行結束,該任務能夠把全部結果合併到同一個結果。圖示以下:
固然,並不是全部類型的任務都會返回一個結果。若是這個任務並不返回一個結果,它只需等待全部子任務執行完畢。也就不須要結果的合併啦。
ForkJoinPool 是一個特殊的線程池,它的設計是爲了更好的配合 分叉-和-合併 任務分割的工做。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名爲 java.util.concurrent.ForkJoinPool。
你能夠經過其構造子建立一個 ForkJoinPool。做爲傳遞給 ForkJoinPool 構造子的一個參數,你能夠定義你指望的並行級別。並行級別表示你想要傳遞給 ForkJoinPool 的任務所需的線程或 CPU 數量。如下是一個 ForkJoinPool 示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4); 1
複製代碼
這個示例建立了一個並行級別爲 4 的 ForkJoinPool。
就像提交任務到 ExecutorService 那樣,把任務提交到 ForkJoinPool。你能夠提交兩種類型的任務。一種是沒有任何返回值的(一個 「行動」),另外一種是有返回值的(一個」任務」)。這兩種類型分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種類型的任務,以及如何對它們進行提交。
RecursiveAction 是一種沒有任何返回值的任務。它只是作一些工做,好比寫數據到磁盤,而後就退出了。 一個 RecursiveAction 能夠把本身的工做分割成更小的幾塊,這樣它們能夠由獨立的線程或者 CPU 執行。 你能夠經過繼承來實現一個 RecursiveAction。示例以下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
public class MyRecursiveAction extends RecursiveAction {
private long workLoad = 0;
public MyRecursiveAction(long workLoad) {
this.workLoad = workLoad;
}
@Override
protected void compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
subtasks.addAll(createSubtasks());
for(RecursiveAction subtask : subtasks){
subtask.fork();
}
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
}
}
private List<MyRecursiveAction> createSubtasks() {
List<MyRecursiveAction> subtasks =
new ArrayList<MyRecursiveAction>();
MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
} 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
複製代碼
例子很簡單。MyRecursiveAction 將一個虛構的 workLoad 做爲參數傳給本身的構造子。若是 workLoad 高於一個特定閥值,該工做將被分割爲幾個子工做,子工做繼續分割。若是 workLoad 低於特定閥值,該工做將由 MyRecursiveAction 本身執行。
你能夠這樣規劃一個 MyRecursiveAction 的執行:
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
forkJoinPool.invoke(myRecursiveAction); 123
複製代碼
RecursiveTask 是一種會返回結果的任務。它能夠將本身的工做分割爲若干更小任務,並將這些子任務的執行結果合併到一個集體結果。能夠有幾個水平的分割和合並。如下是一個 RecursiveTask 示例:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask(long workLoad) {
this.workLoad = workLoad;
}
protected Long compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveTask> subtasks =
new ArrayList<MyRecursiveTask>();
subtasks.addAll(createSubtasks());
for(MyRecursiveTask subtask : subtasks){
subtask.fork();
}
long result = 0;
for(MyRecursiveTask subtask : subtasks) {
result += subtask.join();
}
return result;
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
return workLoad * 3;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks =
new ArrayList<MyRecursiveTask>();
MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
} 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
複製代碼
除了有一個結果返回以外,這個示例和 RecursiveAction 的例子很像。MyRecursiveTask 類繼承自 RecursiveTask,這也就意味着它將返回一個 Long 類型的結果。
MyRecursiveTask 示例也會將工做分割爲子任務,並經過 fork() 方法對這些子任務計劃執行。
此外,本示例還經過調用每一個子任務的 join() 方法收集它們返回的結果。子任務的結果隨後被合併到一個更大的結果,並最終將其返回。對於不一樣級別的遞歸,這種子任務的結果合併可能會發生遞歸。
你能夠這樣規劃一個 RecursiveTask:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult); 12345
複製代碼
注意是如何經過 ForkJoinPool.invoke() 方法的調用來獲取最終執行結果的。
貌似並不是每一個人都對 Java 7 裏的 ForkJoinPool 滿意:《一個 Java 分叉-合併 帶來的災禍》。 在你計劃在本身的項目裏使用 ForkJoinPool 以前最好讀一下該篇文章。
java.util.concurrent.locks.Lock 是一個相似於 synchronized 塊的線程同步機制。可是 Lock 比 synchronized 塊更加靈活、精細。 順便說一下,在個人《Java 併發指南》中我對如何實現你本身的鎖進行了描述。
既然 Lock 是一個接口,在你的程序裏須要使用它的實現類之一來使用它。如下是一個簡單示例:
Lock lock = new ReentrantLock();
lock.lock();
//critical section
lock.unlock(); 1234567
複製代碼
首先建立了一個 Lock 對象。以後調用了它的 lock() 方法。這時候這個 lock 實例就被鎖住啦。任何其餘再過來調用 lock() 方法的線程將會被阻塞住,直到鎖定 lock 實例的線程調用了 unlock() 方法。最後 unlock() 被調用了,lock 對象解鎖了,其餘線程能夠對它進行鎖定了。
java.util.concurrent.locks 包提供瞭如下對 Lock 接口的實現類:ReentrantLock
一個 Lock 對象和一個 synchronized 代碼塊之間的主要不一樣點是:
Lock 接口具備如下主要方法:
lock() 將 Lock 實例鎖定。若是該 Lock 實例已被鎖定,調用 lock() 方法的線程將會阻塞,直到 Lock 實例解鎖。
lockInterruptibly() 方法將會被調用線程鎖定,除非該線程被打斷。此外,若是一個線程在經過這個方法來鎖定 Lock 對象時進入阻塞等待,而它被打斷了的話,該線程將會退出這個方法調用。
tryLock() 方法試圖當即鎖定 Lock 實例。若是鎖定成功,它將返回 true,若是 Lock 實例已被鎖定該方法返回 false。這一方法永不阻塞。
tryLock(long timeout, TimeUnit timeUnit) 的工做相似於 tryLock() 方法,除了它在放棄鎖定 Lock 以前等待一個給定的超時時間以外。
unlock() 方法對 Lock 實例解鎖。一個 Lock 實現將只容許鎖定了該對象的線程來調用此方法。其餘(沒有鎖定該 Lock 對象的線程)線程對 unlock() 方法的調用將會拋一個未檢查異常(RuntimeException)。
java.util.concurrent.locks.ReadWriteLock 讀寫鎖是一種先進的線程鎖機制。它可以容許多個線程在同一時間對某特定資源進行讀取,但同一時間內只能有一個線程對其進行寫入。
讀寫鎖的理念在於多個線程可以對一個共享資源進行讀取,而不會致使併發問題。併發問題的發生場景在於對一個共享資源的讀和寫操做的同時進行,或者多個寫操做併發進行。
本節只討論 Java 內置 ReadWriteLock。若是你想了解 ReadWriteLock 背後的實現原理,請參考個人《Java 併發指南》主題中的《讀寫鎖》小節。
一個線程在對受保護資源在讀或者寫以前對 ReadWriteLock 鎖定的規則以下: 讀鎖:若是沒有任何寫操做線程鎖定 ReadWriteLock,而且沒有任何寫操做線程要求一個寫鎖(但尚未得到該鎖)。所以,能夠有多個讀操做線程對該鎖進行鎖定。
寫鎖:若是沒有任何讀操做或者寫操做。所以,在寫操做的時候,只能有一個線程對該鎖進行鎖定。
ReadWriteLock 是個接口,若是你想用它的話就得去使用它的實現類之一。java.util.concurrent.locks 包提供了 ReadWriteLock 接口的如下實現類:ReentrantReadWriteLock
如下是 ReadWriteLock 的建立以及如何使用它進行讀、寫鎖定的簡單示例代碼:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.readLock().lock();
// multiple readers can enter this section
// if not locked for writing, and not writers waiting
// to lock for writing.
readWriteLock.readLock().unlock();
readWriteLock.writeLock().lock();
// only one writer can enter this section,
// and only if no threads are currently reading.
readWriteLock.writeLock().unlock(); 12345678910111213141516
複製代碼
注意如何使用 ReadWriteLock 對兩種鎖實例的持有。一個對讀訪問進行保護,一個隊寫訪問進行保護。
AtomicBoolean 類爲咱們提供了一個能夠用原子方式進行讀和寫的布爾值,它還擁有一些先進的原子性操做,好比 compareAndSet()。AtomicBoolean 類位於 java.util.concurrent.atomic 包,完整類名是爲 java.util.concurrent.atomic.AtomicBoolean。本小節描述的 AtomicBoolean 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。
AtomicBoolean 背後的設計理念在個人《Java 併發指南》主題的《比較和交換》小節有解釋。
你能夠這樣建立一個 AtomicBoolean:
AtomicBoolean atomicBoolean = new AtomicBoolean(); 1
複製代碼
以上示例新建了一個默認值爲 false 的 AtomicBoolean。
若是你想要爲 AtomicBoolean 實例設置一個顯式的初始值,那麼你能夠將初始值傳給 AtomicBoolean 的構造子:
AtomicBoolean atomicBoolean = new AtomicBoolean(true); 1
複製代碼
你能夠經過使用 get() 方法來獲取一個 AtomicBoolean 的值。示例以下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get(); 123
複製代碼
以上代碼執行後 value 變量的值將爲 true。
你能夠經過使用 set() 方法來設置一個 AtomicBoolean 的值。示例以下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
atomicBoolean.set(false); 123
複製代碼
以上代碼執行後 AtomicBoolean 的值爲 false。
你能夠經過 getAndSet() 方法來交換一個 AtomicBoolean 實例的值。getAndSet() 方法將返回 AtomicBoolean 當前的值,並將爲 AtomicBoolean 設置一個新值。示例以下:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false); 12
複製代碼
以上代碼執行後 oldValue 變量的值爲 true,atomicBoolean 實例將持有 false 值。代碼成功將 AtomicBoolean 當前值 ture 交換爲 false。
compareAndSet() 方法容許你對 AtomicBoolean 的當前值與一個指望值進行比較,若是當前值等於指望值的話,將會對 AtomicBoolean 設定一個新值。compareAndSet() 方法是原子性的,所以在同一時間以內有單個線程執行它。所以 compareAndSet() 方法可被用於一些相似於鎖的同步的簡單實現。
如下是一個 compareAndSet() 示例:
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newValue = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue, newValue); 123456
複製代碼
本示例對 AtomicBoolean 的當前值與 true 值進行比較,若是相等,將 AtomicBoolean 的值更新爲 false。
AtomicInteger 類爲咱們提供了一個能夠進行原子性讀和寫操做的 int 變量,它還包含一系列先進的原子性操做,好比 compareAndSet()。AtomicInteger 類位於 java.util.concurrent.atomic 包,所以其完整類名爲 java.util.concurrent.atomic.AtomicInteger。本小節描述的 AtomicInteger 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。
AtomicInteger 背後的設計理念在個人《Java 併發指南》主題的《比較和交換》小節有解釋。
建立一個 AtomicInteger 示例以下:
AtomicInteger atomicInteger = new AtomicInteger(); 1
複製代碼
本示例將建立一個初始值爲 0 的 AtomicInteger。 若是你想要建立一個給定初始值的 AtomicInteger,你能夠這樣:
AtomicInteger atomicInteger = new AtomicInteger(123); 1
複製代碼
本示例將 123 做爲參數傳給 AtomicInteger 的構造子,它將設置 AtomicInteger 實例的初始值爲 123。
你可使用 get() 方法獲取 AtomicInteger 實例的值。示例以下:
AtomicInteger atomicInteger = new AtomicInteger(123);
int theValue = atomicInteger.get(); 12
複製代碼
你能夠經過 set() 方法對 AtomicInteger 的值進行從新設置。如下是 AtomicInteger.set() 示例:
AtomicInteger atomicInteger = new AtomicInteger(123);
atomicInteger.set(234); 12
複製代碼
以上示例建立了一個初始值爲 123 的 AtomicInteger,而在第二行將其值更新爲 234。
AtomicInteger 類也經過了一個原子性的 compareAndSet() 方法。這一方法將 AtomicInteger 實例的當前值與指望值進行比較,若是兩者相等,爲 AtomicInteger 實例設置一個新值。AtomicInteger.compareAndSet() 代碼示例:
AtomicInteger atomicInteger = new AtomicInteger(123);
int expectedValue = 123;
int newValue = 234;
atomicInteger.compareAndSet(expectedValue, newValue); 12345
複製代碼
本示例首先新建一個初始值爲 123 的 AtomicInteger 實例。而後將 AtomicInteger 與指望值 123 進行比較,若是相等,將 AtomicInteger 的值更新爲 234。
AtomicInteger 類包含有一些方法,經過它們你能夠增長 AtomicInteger 的值,並獲取其值。這些方法以下:
第一個 addAndGet() 方法給 AtomicInteger 增長了一個值,而後返回增長後的值。getAndAdd() 方法爲 AtomicInteger 增長了一個值,但返回的是增長之前的 AtomicInteger 的值。具體使用哪個取決於你的應用場景。如下是這兩種方法的示例:
AtomicInteger atomicInteger = new AtomicInteger();
System.out.println(atomicInteger.getAndAdd(10));
System.out.println(atomicInteger.addAndGet(10)); 123
複製代碼
本示例將打印出 0 和 20。例子中,第二行拿到的是加 10 以前的 AtomicInteger 的值。加 10 以前的值是 0。第三行將 AtomicInteger 的值再加 10,並返回加操做以後的值。該值如今是爲 20。
你固然也可使用這倆方法爲 AtomicInteger 添加負值。結果實際是一個減法操做。
getAndIncrement() 和 incrementAndGet() 方法相似於 getAndAdd() 和 addAndGet(),但每次只將 AtomicInteger 的值加 1。
AtomicInteger 類還提供了一些減少 AtomicInteger 的值的原子性方法。這些方法是:
decrementAndGet() 將 AtomicInteger 的值減一,並返回減一後的值。getAndDecrement() 也將 AtomicInteger 的值減一,但它返回的是減一以前的值。
AtomicLong 類爲咱們提供了一個能夠進行原子性讀和寫操做的 long 變量,它還包含一系列先進的原子性操做,好比 compareAndSet()AtomicLong 類位於 java.util.concurrent.atomic 包,所以其完整類名爲 java.util.concurrent.atomic.AtomicLong。本小節描述的 AtomicLong 是 Java 8 版本里的,而不是它第一次被引入的 Java 5 版本。
AtomicLong 背後的設計理念在個人《Java 併發指南》主題的《比較和交換》小節有解釋。
建立一個 AtomicLong 建立一個 AtomicLong 以下:
AtomicLong atomicLong = new AtomicLong(); 1
複製代碼
將建立一個初始值爲 0 的 AtomicLong。 若是你想建立一個指定初始值的 AtomicLong,能夠:
AtomicLong atomicLong = new AtomicLong(123); 1
複製代碼
本示例將 123 做爲參數傳遞給 AtomicLong 的構造子,後者將 AtomicLong 實例的初始值設置爲 123。 獲取 AtomicLong 的值 你能夠經過 get() 方法獲取 AtomicLong 的值。AtomicLong.get() 示例:
AtomicLong atomicLong = new AtomicLong(123);
long theValue = atomicLong.get(); 123
複製代碼
設置 AtomicLong 的值 你能夠經過 set() 方法設置 AtomicLong 實例的值。一個 AtomicLong.set() 的示例:
AtomicLong atomicLong = new AtomicLong(123);
atomicLong.set(234); 123
複製代碼
本示例新建了一個初始值爲 123 的 AtomicLong,第二行將其值設置爲 234。
AtomicLong 類也有一個原子性的 compareAndSet() 方法。這一方法將 AtomicLong 實例的當前值與一個指望值進行比較,若是兩種相等,爲 AtomicLong 實例設置一個新值。AtomicLong.compareAndSet() 使用示例:
AtomicLong atomicLong = new AtomicLong(123);
long expectedValue = 123;
long newValue = 234;
atomicLong.compareAndSet(expectedValue, newValue); 12345
複製代碼
本示例新建了一個初始值爲 123 的 AtomicLong。而後將 AtomicLong 的當前值與指望值 123 進行比較,若是相等的話,AtomicLong 的新值將變爲 234。
AtomicLong 具有一些可以增長 AtomicLong 的值並返回自身值的方法。這些方法以下:
第一個方法 addAndGet() 將 AtomicLong 的值加一個數字,並返回增長後的值。第二個方法 getAndAdd() 也將 AtomicLong 的值加一個數字,但返回的是增長前的 AtomicLong 的值。具體使用哪個取決於你本身的場景。示例以下:
AtomicLong atomicLong = new AtomicLong();
System.out.println(atomicLong.getAndAdd(10));
System.out.println(atomicLong.addAndGet(10)); 123
複製代碼
本示例將打印出 0 和 20。例子中,第二行拿到的是加 10 以前的 AtomicLong 的值。加 10 以前的值是 0。第三行將 AtomicLong 的值再加 10,並返回加操做以後的值。該值如今是爲 20。
你固然也可使用這倆方法爲 AtomicLong 添加負值。結果實際是一個減法操做。
getAndIncrement() 和 incrementAndGet() 方法相似於 getAndAdd() 和 addAndGet(),但每次只將 AtomicLong 的值加 1。
AtomicLong 類還提供了一些減少 AtomicLong 的值的原子性方法。這些方法是:
decrementAndGet() 將 AtomicLong 的值減一,並返回減一後的值。getAndDecrement() 也將 AtomicLong 的值減一,但它返回的是減一以前的值。
AtomicReference 提供了一個能夠被原子性讀和寫的對象引用變量。原子性的意思是多個想要改變同一個 AtomicReference 的線程不會致使 AtomicReference 處於不一致的狀態。AtomicReference 還有一個 compareAndSet() 方法,經過它你能夠將當前引用於一個指望值(引用)進行比較,若是相等,在該 AtomicReference 對象內部設置一個新的引用。
建立 AtomicReference 以下:
AtomicReference atomicReference = new AtomicReference(); 1
複製代碼
若是你須要使用一個指定引用建立 AtomicReference,能夠:
String initialReference = "the initially referenced string";
AtomicReference atomicReference = new AtomicReference(initialReference); 12
複製代碼
你可使用 Java 泛型來建立一個泛型 AtomicReference。示例:
AtomicReference<String> atomicStringReference =
new AtomicReference<String>(); 12
複製代碼
你也能夠爲泛型 AtomicReference 設置一個初始值。示例:
String initialReference = "the initially referenced string";
AtomicReference<String> atomicStringReference = new AtomicReference<String>(initialReference); 12
複製代碼
你能夠經過 AtomicReference 的 get() 方法來獲取保存在 AtomicReference 裏的引用。若是你的 AtomicReference 是非泛型的,get() 方法將返回一個 Object 類型的引用。若是是泛型化的,get() 將返回你建立 AtomicReference 時聲明的那個類型。
先來看一個非泛型的 AtomicReference get() 示例:
AtomicReference atomicReference = new AtomicReference("first value referenced");
String reference = (String) atomicReference.get(); 12
複製代碼
注意如何對 get() 方法返回的引用強制轉換爲 String。 泛型化的 AtomicReference 示例:
AtomicReference<String> atomicReference = new AtomicReference<String>("first value referenced");
String reference = atomicReference.get(); 12
複製代碼
編譯器知道了引用的類型,因此咱們無需再對 get() 返回的引用進行強制轉換了。
你可使用 get() 方法對 AtomicReference 裏邊保存的引用進行設置。若是你定義的是一個非泛型 AtomicReference,set() 將會以一個 Object 引用做爲參數。若是是泛型化的 AtomicReference,set() 方法將只接受你定義給的類型。
AtomicReference set() 示例:
AtomicReference atomicReference = new AtomicReference();
atomicReference.set("New object referenced"); 12
複製代碼
AtomicReference 類具有了一個頗有用的方法:compareAndSet()。compareAndSet() 能夠將保存在 AtomicReference 裏的引用於一個指望引用進行比較,若是兩個引用是同樣的(並不是 equals() 的相等,而是 == 的同樣),將會給AtomicReference 實例設置一個新的引用。
若是 compareAndSet() 爲 AtomicReference 設置了一個新的引用,compareAndSet() 將返回 true。不然compareAndSet() 返回 false。
AtomicReference compareAndSet() 示例:
String initialReference = "initial value referenced";
AtomicReference<String> atomicStringReference =
new AtomicReference<String>(initialReference);
String newReference = "new value referenced";
boolean exchanged = atomicStringReference.compareAndSet(initialReference, newReference);
System.out.println("exchanged: " + exchanged);
exchanged = atomicStringReference.compareAndSet(initialReference, newReference);
System.out.println("exchanged: " + exchanged); 1234567891011
複製代碼
本示例建立了一個帶有一個初始引用的泛型化的 AtomicReference。以後兩次調用 comparesAndSet()來對存儲值和指望值進行對比,若是兩者一致,爲 AtomicReference 設置一個新的引用。第一次比較,存儲的引用(initialReference)和指望的引用(initialReference)一致,因此一個新的引用(newReference)被設置給 AtomicReference,compareAndSet() 方法返回 true。第二次比較時,存儲的引用(newReference)和指望的引用(initialReference)不一致,所以新的引用沒有被設置給 AtomicReference,compareAndSet() 方法返回 false。