Java 併發工具包 java.util.concurrent 用戶指南

1. java.util.concurrent - Java 併發工具包

Java 5 添加了一個新的包到 Java 平臺,java.util.concurrent 包。這個包包含有一系列可以讓 Java 的併發編程變得更加簡單輕鬆的類。在這個包被添加之前,你須要本身去動手實現本身的相關工具類。
本文我將帶你一一認識 java.util.concurrent 包裏的這些類,而後你能夠嘗試着如何在項目中使用它們。
我不會去解釋關於 Java 併發的核心問題 - 其背後的原理,也就是說,若是你對那些東西感興趣,參考《
Java 併發指南》。html

2. 阻塞隊列 BlockingQueue

java.util.concurrent 包裏的 BlockingQueue 接口表示一個線程放入和提取實例的隊列。本小節我將給你演示如何使用這個 BlockingQueue。
本節不會討論如何在 Java 中實現一個你本身的 BlockingQueue。若是你對那個感興趣,參考《
Java 併發指南java

BlockingQueue 用法

BlockingQueue 一般用於一個線程生產對象,而另一個線程消費這些對象的場景。下圖是對這個原理的闡述:編程

一個線程往裏邊放,另一個線程從裏邊取的一個 BlockingQueue。
一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是說,它是有限的。若是該阻塞隊列到達了其臨界點,負責生產的線程將會在往裏邊插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。
負責消費的線程將會一直從該阻塞隊列中拿出對象。若是消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。
數組

BlockingQueue 的方法:

BlockingQueue 具備 4 組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:數據結構

 

 

 

四組不一樣的行爲方式解釋:併發

  1. 拋異常:若是試圖的操做沒法當即執行,拋一個異常。
  2. 特定值:若是試圖的操做沒法當即執行,返回一個特定的值(經常是 true / false)。
  3. 阻塞:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行。
  4. 超時:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定值以告知該操做是否成功(典型的是 true / false)。

沒法向一個 BlockingQueue 中插入 null。若是你試圖插入 null,BlockingQueue 將會拋出一個 NullPointerException。
能夠訪問到 BlockingQueue 中的全部元素,而不只僅是開始和結束的元素。好比說,你將一個對象放入隊列之中以等待處理,但你的應用想要將其取消掉。那麼你能夠調用諸如 remove(o) 方法來將隊列之中的特定對象進行移除。可是這麼幹效率並不高(譯者注:基於隊列的數據結構,獲取除開始或結束位置的其餘對象的效率不會過高),所以你儘可能不要用這一類的方法,除非你確實不得不那麼作。
工具

BlockingQueue 的實現

BlockingQueue 是個接口,你須要使用它的實現之一來使用 BlockingQueue。java.util.concurrent 具備如下 BlockingQueue 接口的實現(Java 6):性能

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

這裏是一個 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 實現。
首先,BlockingQueueExample 類分別在兩個獨立的線程中啓動了一個 Producer 和 一個 Consumer。Producer 向一個共享的 BlockingQueue 中注入字符串,而 Consumer 則會從中把它們拿出來。
this

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

如下是 Producer 類。注意它在每次 put() 調用時是如何休眠一秒鐘的。這將致使 Consumer 在等待隊列中對象的時候發生阻塞。spa

public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

如下是 Consumer 類。它只是把對象從隊列中抽取出來,而後將它們打印到 System.out。

public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.數組阻塞隊列 ArrayBlockingQueue

ArrayBlockingQueue 類實現了 BlockingQueue 接口。
ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了(譯者注:由於它是基於數組實現的,也就具備數組的特性:一旦初始化,大小就沒法修改)。
ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。
如下是在使用  ArrayBlockingQueue 的時候對其初始化的一個示例:

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);

queue.put("1");

String string = queue.take();

如下是使用了 Java 泛型的一個 BlockingQueue 示例。注意其中是如何對 String 元素放入和提取的:

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);

queue.put("1");

String string = queue.take();

4. 延遲隊列 DelayQueue

DelayQueue 實現了 BlockingQueue 接口。
DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口,該接口定義:

public interface Delayed extends Comparable<Delayed< {

 public long getDelay(TimeUnit timeUnit);

}

DelayQueue 將會在每一個元素的 getDelay() 方法返回的值的時間段以後才釋放掉該元素。若是返回的是 0 或者負值,延遲將被認爲過時,該元素將會在 DelayQueue 的下一次 take  被調用的時候被釋放掉。
傳遞給 getDelay 方法的 getDelay 實例是一個枚舉類型,它代表了將要延遲的時間段。TimeUnit 枚舉將會取如下值:

DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS

正如你所看到的,Delayed 接口也繼承了 java.lang.Comparable 接口,這也就意味着 Delayed 對象之間能夠進行對比。這個可能在對 DelayQueue 隊列中的元素進行排序時有用,所以它們能夠根據過時時間進行有序釋放。如下是使用 DelayQueue 的例子:

public class DelayQueueExample {

    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue();

        Delayed element1 = new DelayedElement();

        queue.put(element1);

        Delayed element2 = queue.take();
    }
}

DelayedElement 是我所建立的一個 DelayedElement 接口的實現類,它不在 java.util.concurrent 包裏。你須要自行建立你本身的 Delayed 接口的實現以使用 DelayQueue 類。

5. 鏈阻塞隊列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 接口。
LinkedBlockingQueue 內部以一個鏈式結構(連接節點)對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上限。若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限。
LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。
如下是 LinkedBlockingQueue 的初始化和使用示例代碼:

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

6. 具備優先級的阻塞隊列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 接口。
PriorityBlockingQueue 是一個無界的併發隊列。它使用了和類 java.util.PriorityQueue 同樣的排序規則。你沒法向這個隊列中插入 null 值。
全部插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。所以該隊列中元素的排序就取決於你本身的 Comparable 實現。
注意 PriorityBlockingQueue 對於具備相等優先級(compare() == 0)的元素並不強制任何特定行爲。
同時注意,若是你從一個 PriorityBlockingQueue 得到一個 Iterator 的話,該 Iterator 並不能保證它對元素的遍歷是以優先級爲序的。
如下是使用 PriorityBlockingQueue 的示例:

BlockingQueue queue   = new PriorityBlockingQueue();
    //String implements java.lang.Comparable
    queue.put("Value");
    String value = queue.take();

7. 同步隊列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 接口。
SynchronousQueue 是一個特殊的隊列,它的內部同時只可以容納單個元素。若是該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。據此,把這個類稱做一個隊列顯然是誇大其詞了。它更多像是一個匯合點。

8. 阻塞雙端隊列 BlockingDeque

java.util.concurrent 包裏的 BlockingDeque 接口表示一個線程安放入和提取實例的雙端隊列。本小節我將給你演示如何使用 BlockingDeque。
BlockingDeque 類是一個雙端隊列,在不可以插入元素時,它將阻塞住試圖插入元素的線程;在不可以抽取元素時,它將阻塞住試圖抽取的線程。
deque(雙端隊列) 是 "Double Ended Queue" 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列

BlockingDeque 的使用

在線程既是一個隊列的生產者又是這個隊列的消費者的時候可使用到 BlockingDeque。若是生產者線程須要在隊列的兩端均可以插入數據,消費者線程須要在隊列的兩端均可以移除數據,這個時候也可使用 BlockingDeque。BlockingDeque 圖解:

一個 BlockingDeque - 線程在雙端隊列的兩端均可以插入和提取元素。
一個線程生產元素,並把它們插入到隊列的任意一端。若是雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。若是雙端隊列爲空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。

BlockingDeque 具備 4 組不一樣的方法用於插入、移除以及對雙端隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下

四組不一樣的行爲方式解釋:

  1. 拋異常:若是試圖的操做沒法當即執行,拋一個異常。
  2. 特定值:若是試圖的操做沒法當即執行,返回一個特定的值(經常是 true / false)。
  3. 阻塞:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行。
  4. 超時:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定值以告知該操做是否成功(典型的是 true / false)。

BlockingDeque 繼承自 BlockingQueue

BlockingDeque 接口繼承自 BlockingQueue 接口。這就意味着你能夠像使用一個 BlockingQueue 那樣使用 BlockingDeque。若是你這麼幹的話,各類插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法同樣。
如下是 BlockingDeque 對 BlockingQueue 接口的方法的具體內部實現:

BlockingDeque 的實現

既然 BlockingDeque 是一個接口,那麼你想要使用它的話就得使用它的衆多的實現類的其中一個。java.util.concurrent 包提供瞭如下 BlockingDeque 接口的實現類:

  • LinkedBlockingDeque

BlockingDeque 代碼示例

如下是如何使用 BlockingDeque 方法的一個簡短代碼示例:

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();

9. 鏈阻塞雙端隊列 LinkedBlockingDeque

LinkedBlockingDeque 類實現了 BlockingDeque 接口。
deque(雙端隊列) 是 "Double Ended Queue" 的縮寫。所以,雙端隊列是一個你能夠從任意一端插入或者抽取元素的隊列。(譯者注:唐僧啊,受不了。)
LinkedBlockingDeque 是一個雙端隊列,在它爲空的時候,一個試圖從中抽取數據的線程將會阻塞,不管該線程是試圖從哪一端抽取數據。

10. 併發 Map(映射) ConcurrentMap

java.util.concurrent.ConcurrentMap

java.util.concurrent.ConcurrentMap 接口表示了一個可以對別人的訪問(插入和提取)進行併發處理的 java.util.Map。
ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法以外還有一些額外的原子性方法。

ConcurrentMap 的實現

既然 ConcurrentMap 是個接口,你想要使用它的話就得使用它的實現類之一。java.util.concurrent 包具有 ConcurrentMap 接口的如下實現類:

  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 java.util.HashTable 類很類似,但 ConcurrentHashMap 可以提供比 HashTable 更好的併發性能。在你從中讀取對象的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。此外,在你向其中寫入對象的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map 中正在被寫入的部分進行鎖定。
另一個不一樣點是,在被遍歷的時候,即便是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。儘管 Iterator 的設計不是爲多個線程的同時使用。
更多關於 ConcurrentMap 和 ConcurrentHashMap 的細節請參考官方文檔。

11. 併發導航映射 ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 是一個支持併發訪問的 java.util.NavigableMap,它還能讓它的子 map 具有併發訪問的能力。所謂的 "子 map" 指的是諸如 headMap(),subMap(),tailMap() 之類的方法返回的 map。

NavigableMap 中的方法再也不贅述,本小節咱們來看一下 ConcurrentNavigableMap 添加的方法。

headMap()

headMap(T toKey) 方法返回一個包含了小於給定 toKey 的 key 的子 map。
若是你對原始 map 裏的元素作了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
如下示例演示了對 headMap() 方法的使用:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();

map.put("1", "one");
map.put("2", "two");
map.put("3", "three");

ConcurrentNavigableMap headMap = map.headMap("2");

headMap 將指向一個只含有鍵 "1" 的 ConcurrentNavigableMap,由於只有這一個鍵小於 "2"。關於這個方法及其重載版本具體是怎麼工做的細節請參考 Java 文檔。

tailMap()

tailMap(T fromKey) 方法返回一個包含了不小於給定 fromKey 的 key 的子 map。
若是你對原始 map 裏的元素作了改動,這些改動將影響到子 map 中的元素(譯者注:map 集合持有的其實只是對象的引用)。
如下示例演示了對 tailMap() 方法的使用:

ConcurrentNavigableMap map = new ConcurrentSkipListMap();

map.put("1", "one");
map.put("2", "two");
map.put("3", "three");

ConcurrentNavigableMap tailMap = map.tailMap("2");

subMap()

 

subMap() 方法返回原始 map 中,鍵介於 from(包含) 和 to (不包含) 之間的子 map。示例以下:
 

ConcurrentNavigableMap map = new ConcurrentSkipListMap();

map.put("1", "one");
map.put("2", "two");
map.put("3", "three");

ConcurrentNavigableMap subMap = map.subMap("2", "3");

返回的 submap 只包含鍵 "2",由於只有它知足不小於 "2",比 "3" 小。

12. 閉鎖 CountDownLatch

java.util.concurrent.CountDownLatch 是一個併發構造,它容許一個或多個線程等待一系列指定操做的完成。
CountDownLatch 以一個給定的數量初始化。countDown() 每被調用一次,這一數量就減一。經過調用 await() 方法之一,線程能夠阻塞等待這一數量到達零。
如下是一個簡單示例。Decrementer 三次調用 countDown() 以後,等待中的 Waiter 纔會從 await() 調用中釋放出來。

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);

public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索