Java 基礎(十五)併發工具包 concurrent

本文目錄:

  • java.util.concurrent - Java 併發包簡介
  • 阻塞隊列 BlockingQueue
  • 數組阻塞隊列 ArrayBlockingQueue
  • 延遲隊列 DelayQueue
  • 鏈阻塞隊列 LinkedBlockingQueue
  • 具備優先級的阻塞隊列 PriorityBlockingQueue
  • 同步隊列 SynchronousQueue
  • 阻塞雙端隊列 BlockingDeque
  • 鏈阻塞雙端隊列 LinkedBlockingDeque
  • 併發 Map ConcurrentMap
  • 併發導航映射 ConcurrentNavigableMap
  • 閉鎖 ConutDownLatch
  • 柵欄 CyclicBarrier
  • 交換機 Exchanger
  • 信號量 Semaphore
  • 執行器服務 ExecutorService
  • 線程池執行者 ThreadPoolExecutor
  • 定時執行者服務 ScheduledExecutorService
  • 使用 ForkJoinPool 進行分叉和合並
  • 鎖 Lock
  • 讀寫鎖 ReadWriteLock
  • 原子性布爾 AtomicBoolean
  • 原子性整型 AtomicInteger
  • 原子性長整型 AtomicLong
  • 原子性引用型 AtomicReference

本章內容比較多,我本身也是邊學邊總結,因此拖到今天才出爐。另外,建議學習本文的小夥伴在學習的過程當中,把代碼 copy 出去run 一下,有助於理解。html

1.java.util.concurrent Java 併發工具包

這是 Java5 添加的一個併發工具包。這個包包含了一系列可以讓 Java 的併發編程變得更加簡單輕鬆的類。在這以前,你須要本身手動去實現相關的工具類。java

本文將和你們一塊兒學習 java.util.concurrent包裏的這些類,學完以後咱們能夠嘗試如何在項目中使用它們。算法

2.阻塞隊列 BlockingQueue

java.util.concurrent 包裏面的 BlockingQueue 接口表示一個線程安放如和提取實例的隊列。編程

這裏咱們不會討論在 Java 中實現一個你本身的 BlockingQueue。api

BlockingQueue 用法

BlockingQueue 一般用於一個線程生產對象,而另外一個線程消費這些對象的場景。下圖是對這個原理的闡述:數組

一個線程往裏邊放,另一個線程從裏邊取的一個 BlockingQueue.png
一個線程往裏邊放,另一個線程從裏邊取的一個 BlockingQueue.png

一個線程將會持續生產新對象並將其插入到隊列之中,直到隊列達到它所能容納的臨界點。也就是說,它是有限的。若是該阻塞隊列到達了其臨界點,負責生產的線程將會在往裏面插入新對象時發生阻塞。它會一直處於阻塞之中,直到負責消費的線程從隊列中拿走一個對象。負責消費的線程將會一直從該阻塞隊列中拿出對象。若是消費線程嘗試去從一個空的隊列中提取對象的話,這個消費線程將會處於阻塞之中,直到一個生產線程把一個對象丟進隊列。瀏覽器

BlockingQueue 的方法

BlockingQueue 具備4組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:bash

~ 拋異常 特定值 阻塞 超時
插入 add(o) offer(o) put(o) offer(o,timeout,timeUnit)
移除 remove(o) poll(o) take(o) poll(timeout,timeunit)
檢查 element(o) peek(o) ~ ~

四組不一樣的行爲方式解釋:多線程

1.拋異常:若是試圖的操做沒法當即執行,拋一個異常
2.特定值:若是試圖的操做沒法當即執行,返回一個特定的值(通常是 true/false)
3.阻塞:若是試圖的操做沒法當即執行,該方法將會發生阻塞,直到能執行
4.超時:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定的值以告知該操做是否成功。併發

沒法向一個 BlockingQueue 中插入 null。若是你試圖插入 null,BlockingQueue 會拋出一個 NullPointerException。

能夠訪問到 BlockingQueue 中的全部元素,而不只僅是開始和結束的元素。好比說你將一個對象放入隊列之中以等待處理,但你的應用想要將其取消掉,那麼你能夠調用諸如remove(o)方法啦將隊列中的特定對象進行移除。可是這麼幹相率並不高,所以儘可能不要用這一類方法,除非無可奈何。

BlockingQueue 的實現

BlockingQueue 是個藉口,你能夠經過它的實現之一來使用 BlockingQueue。concurrent 包裏面有以下幾個類實現了 BlockingQueue:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

Java 中使用 BlockingQueue 的例子

這是一個 Java 鍾使用 BlockingQueue 的示例,本示例使用的是 BlockingQueue 藉口的 ArrayBlockingQueue 實現。
首先,BlockingQueueExample 類分別在兩個獨立的線程中啓動了一個 Producer 和 Consumer 。Producer 向一個共享的 BlockingQueue 中注入字符串,而 Consumer 則會從中把它們拿出來。

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

class Producer implements Runnable {
    private BlockingQueue<String> queue = null;

    Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException ignored) {
        }
    }
}

class Consumer implements Runnable {
    private BlockingQueue queue = null;

    Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}複製代碼

這個例子很簡單吶,我就不加文字描述了。

3.數組阻塞隊列 ArrayBlockingQueue

ArrayBlockingQueue 類實現了 BlockingQueue 接口。

ArrayBlockingQueue 是衣蛾有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不可以存儲無限多數量的原色。它有一個同一時間存儲元素數量的上線。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了。

ArrayBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個,而尾元素則是最短的那個。

如下是在使用 ArrayBlockingQueue 的時候對其初始化的一個示例:

BlockingQueue queue = new ArrayBlockingQueue(1024);
try {
    queue.put("1");
    Object object = queue.take();
} catch (InterruptedException e) {
    e.printStackTrace();
}複製代碼

4.延遲隊列 DelayQueue

DelayQueue 實現了 BlockingQueue 接口
DelayQueue 對元素進行持有知道一個特定的延遲到期。注入其中的元素必須實現 concurrent.Delay 接口,該接口定義:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit var1);
}複製代碼

DelayQueue 將會在每一個元素的 getDelay()方法返回的值的時間段以後才釋放掉該元素。若是返回的是 0 或者負值,延遲將被認爲過時,該元素將會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。

傳遞給 getDelay 方法的 getDelay 實例是一個枚舉型,它代表了將要延遲的時間段。

TimeUnit 枚舉的取值單位都能顧名思義,這裏就帶過了。

上面咱們能夠看到 Delayed 接口繼承了 Comparable 接口,這也就意味着 Delayed 對象之間能夠進行對比。這個可能在對 DelayeQueue 隊列中的元素進行排序時有用,所以它們能夠根據過時時間進行有序釋放。

如下是使用 DelayQueue 的例子:

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedElement> queue = new DelayQueue<>();
        DelayedElement element1 = new DelayedElement(1000);
        DelayedElement element2 = new DelayedElement(0);
        DelayedElement element3 = new DelayedElement(400);
        queue.put(element1);
        queue.put(element2);
        queue.put(element3);
        DelayedElement e = queue.take();
        System.out.println("e1:" + e.delayTime);
        DelayedElement e2 = queue.take();
        System.out.println("e2:" + e2.delayTime);
        DelayedElement e3 = queue.take();
        System.out.println("e3:" + e3.delayTime);
    }
}

class DelayedElement implements Delayed {
    long delayTime;
    long tamp;

    DelayedElement(long delay) {
        delayTime = delay;
        tamp = delay + System.currentTimeMillis();
    }

    @Override
    public long getDelay(@NonNull TimeUnit unit) {
        return tamp - System.currentTimeMillis();
//        return -1;
    }

    @Override
    public int compareTo(@NonNull Delayed o) {
        return tamp - ((DelayedElement) o).tamp > 0 ? 1 : -1;
    }
}複製代碼

運行結果:

e1:0
e2:400
e3:1000複製代碼

在 take 取出 e2的時候,會阻塞。
compareTo 決定隊列中的取出順序
getDelay 決定是否能取出元素,若是沒法取出則阻塞線程。

具體玩法,你們能夠自行思考,我看了一下別人用DelayQueue,能玩出不少花樣,在某些特定的需求很方便。

5.鏈阻塞隊列 LinkedBlockingQueue

LinkedBlockingQueue 類也實現了 BlockingQueue接口。
LinkedBlockingQueue 內部以一個鏈式結構對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上線。若是沒有定義上線,將使用 Ingeter.MAX_VALUE 做爲上線。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在全部元素之中是放入時間最久的那個。

使用方式同 ArrayBlockingQueue。

6.具備優先級的阻塞隊列 PriorityBlockingQueue

PriorityBlockingQueue 類也實現了 BlockingQueue 接口。

PriorityBlockingQueue 是一個無界的併發隊列。它使用了和 PriorityQueue 同樣的排序規則。你沒法向這個隊列中插入 null 值。PriorityQueue 的代碼分析在集合中講了,感興趣的小夥伴能夠回頭去閱讀。

全部插入到 PriorityBlockingQueue 的元素必須實現 Comparable 接口或者在構造方法中傳入Comparator。

注意:PriorityBlockingQueue 對於具備相等優先級的元素並不強制任何特定的行爲。

同時注意:若是你從一個 PriorityBlockingQueue 得到一個 Iterator 的話,該 Iterator並不能保證它對元素的遍歷是按照優先順序的。原理在以前的文章中分析過~

使用方法同上。

7.同步隊列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 接口。
SynchronousQueue 是一個特殊的隊列,它的內部同時只能容納單個元素。若是該隊列已有一個元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,知道另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。
據此,把這個類稱做一個隊列顯然是誇大其詞,它更多像是一個匯合點。

使用方法和 ArrayBlockingQueue 同樣吧,區別就是 SynchronousQueue 只能保存一個元素。

能夠理解成這個,哈哈哈new ArrayBlockingQueue<>(1);

8.阻塞雙端隊列 BlockingDeque

BlockingDeque 接口在 concurrent 包裏,表示一個線程安放入和提取實例的雙端隊列。

BlockingDeque 類是一個雙端隊列,在不可以插入元素時,它將阻塞住試圖插入元素的線程;在不可以抽取元素時,它將阻塞住試圖抽取的線程。

deque 是「Double Ended Queue」的縮寫。所以,雙端隊列是一個你能夠從任意一段插入或者抽取元素的隊列。

BlockingDeque 的使用

在線程既是生產者又是這個隊列的消費者的時候能夠用到 BlockingDeque。若是生產者線程須要在隊列的兩端均可以插入數據,消費者線程須要在隊列的兩端均可以移除數據,這時候也能夠用 BlockingDeque。BlockingDeque 圖解:

一個線程生產元素,並把它們插入到隊列的任意一段。若是雙端隊列已滿,插入線程將被阻塞,知道一個移除線程從隊列中移除了一個元素。

BlockingDeque 的方法

BlockingDeque 具備4組不一樣的方法用於插入、移除以及對雙端隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下:

~ 拋異常 特定值 阻塞 超時
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o,timeout,timeUnit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout,timeunit)
檢查 getFirst(o) peekFirst(o) ~ ~
~ 拋異常 特定值 阻塞 超時
插入 addLast(o) offerLast(o) putLast(o) offerLast(o,timeout,timeUnit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout,timeunit)
檢查 getLast(o) peekLast(o) ~ ~

1.拋異常:若是試圖的操做沒法當即執行,拋一個異常
2.特定值:若是試圖的操做沒法當即執行,返回一個特定的值(通常是 true/false)
3.阻塞:若是試圖的操做沒法當即執行,該方法將會發生阻塞,直到能執行
4.超時:若是試圖的操做沒法當即執行,該方法調用將會發生阻塞,直到可以執行,但等待時間不會超過給定值。返回一個特定的值以告知該操做是否成功。

這一段文字有沒有感受特別眼熟,hahah~其實它和 BlockingQueue 同樣。

BlockingDeque 繼承自 BlockingQueue

BlockingDeque 接口繼承自 BlockingQueue 接口。這就意味着你能夠像使用一個 BlockingQueue 那樣使用 BlockingDeque。若是你這麼幹的話,各類插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端元素移除。正如 BlockingQueue 接口的插入和移除方法同樣。

BlockingDeque 的實現

既然 BlockingDeque 是一個接口,那麼確定有實現類,它的實現類很少,就一個:

  • LinkedBlockingDeque

BlockingDeque 代碼示例

這個真沒什麼好說的。。。

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();
deque.addFirst("1");
deque.addLast("2");
String two = deque.takeLast();
String one = deque.takeFirst();複製代碼

9.鏈阻塞雙端隊列LinkedBlockingDeque

LinkedBlockingDeque 類實現了 BlockingDeque 接口。

不想寫描述了,跳過了昂~

10.併發 Map(映射)ConcurrentMap

ConcurrentMap 接口表示了一個可以對別人的訪問(插入和提取)進行併發處理的 Map。
ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法以外還有一些額外的原子性方法。

ConcurrentMap 的實現

concurrent 包裏面就一個類實現了 ConcurrentMap 接口

  • ConcurrentHashMap

ConcurrentHashMap

ConcurrentHashMap 和 HashTable 類很類似,但 ConcurrentHashMap 能提供比 HashTable 更好的併發性能。在你從中讀取對象的時候,ConcurrentHashMap 並不會把整個 Map 鎖住。此外,在你向其寫入對象的時候,ConcurrentHashMap 也不會鎖住整個 Map,它的內部只是把 Map 中正在被寫入的部分鎖定。
其實就是把 synchronized 同步整個方法改成了同步方法裏面的部分代碼。

另一個不一樣點是,在被遍歷的時候,即便是 ConcurrentHashMap 被改動,它也不會拋 ConcurrentModificationException。儘管 Iterator 的設計不是爲多個線程同時使用。

使用例子:

public class ConcurrentHashMapExample {

    public static void main(String[] args) {
//        HashMap<String, String> map = new HashMap<>();
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put("1", "a");
        map.put("2", "b");
        map.put("3", "c");
        map.put("4", "d");
        map.put("5", "e");
        map.put("6", "f");
        map.put("7", "g");
        map.put("8", "h");
        new Thread1(map).start();
        new Thread2(map).start();

    }

}

class Thread1 extends Thread {

    private final Map map;

    Thread1(Map map) {
        this.map = map;
    }

    @Override
    public void run() {
        super.run();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.remove("6");
    }
}

class Thread2 extends Thread {

    private final Map map;

    Thread2(Map map) {
        this.map = map;
    }

    @Override
    public void run() {
        super.run();
        Set set = map.keySet();
        for (Object next : set) {
            System.out.println(next + ":" + map.get(next));
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}複製代碼

打印結果:

1:a
2:b
3:c
4:d
5:e
7:g
8:h複製代碼

思考題:用這個 Map 行不行?Map map = Collections.synchronizedMap(new HashMap());

哈哈哈~答案很簡單,思考一下就好了。

11.併發導航映射 ConcurrentNaviagbleMap

ConcurrentNavigableMap 是一個支持併發的 NavigableMap,它還能讓他的子 Map 具有併發訪問的能力,所謂的「子 map」指的是諸如 headMap(),subMap(),tailMap()之類的方法返回的 map.

NavigableMap

NavigableMap 這個接口以前集合中遺漏了。
這裏稍微補充一下吧,首先繼承關係是 NavigableMap繼承自 SortedMap 繼承自 Map。

SortedMap從名字就能夠看出,在Map的基礎上增長了排序的功能。它要求key與key之間是能夠相互比較的,從而達到排序的目的。

而NavigableMap是繼承於SortedMap,目前只有TreeMap和ConcurrentNavigableMap兩種實現方式。它本質上添加了搜索選項到接口,主要爲紅黑樹服務。先來了解下它新增的幾個方法

主要方法

  • lowerEntry(K key)返回小於 key 的最大值的節點
  • lowerKey(K key)返回小於 key 的最大值節點的 key
  • floorEntry(K key)返回小於等於 key 的最大值節點
  • floorKey(K key)返回小於等於 key 的最大值節點 key
  • ceilingEntry(K key)返回大於等於 key 的最小節點
  • ceilingkey(K key)返回大於等於 key 的最小節點的 key
  • higherEntry(K key)返回大於 key 的最小節點
  • higherKey(K key)返回大於 key 的最小節點 key
  • firstEntry()返回最小key 節點
  • lastEntry()返回最大 key 節點
  • descendingMap()獲取反序的 map
  • navigableKeySet()獲取升序迭代器
  • decendingKeySet()獲取降序的迭代器
  • subMap(K from,K to)截取 map
  • headMap(K toKey)截取小於等於 toKey 的 map
  • tailMao(K fromKey)截取大於等於 key 的 map

額,講完了。。。。。就不舉🌰了吧~

12.閉鎖 CountDownLatch

java.util.concurrent.CountDownLatch 是一個併發構造,它容許一個或多個線程等待一系列指定操做的完成。

CountDownLatch 以一個給定的數量初始化。countDown()每被調用一次,這一數量就建議。經過調用 await()方法之一,線程能夠阻塞等待這一數量到達零。

下面是一個簡單的示例,Decrementer 三次調用 countDown()以後,等待中的 Waiter 纔會從 await()調用中釋放出來。

public class CountDownLatchExample {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(3);

        Waiter waiter = new Waiter(latch);
        Decrementer decrementer = new Decrementer(latch);

        new Thread(waiter).start();
        new Thread(decrementer).start();

    }

}

class Waiter implements Runnable {

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

class Decrementer implements Runnable {

    CountDownLatch latch = null;

    Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}複製代碼

運行結果

Waiter Released複製代碼

嗯,用法大概就是醬紫。我再給你們舉個實際的例子吧~

有時候會有這樣的需求,多個線程同時工做,而後其中幾個能夠隨意併發執行,但有一個線程須要等其餘線程工做結束後,才能開始。舉個例子,開啓多個線程分塊下載一個大文件,每一個線程只下載固定的一截,最後由另一個線程來拼接全部的分段,那麼這時候咱們能夠考慮使用CountDownLatch來控制併發。

13.柵欄 CyclicBarrier

CyclicBarrier 類是一種同步機制,它能對處理一些算法的線程實現同步。換句話說,它就是一個全部線程必須等待的一個柵欄,直到全部線程都到達這裏,而後全部線程才能夠繼續作其餘事情。

這個文字很好理解吧,沒理解的把上面這段話再讀一遍。

圖示以下:

兩個線程在柵欄旁等待對方
兩個線程在柵欄旁等待對方

經過調用 CuclicBarrier 對象的 await()方法,兩個線程能夠實現互相等待。一旦 N 個線程在等待 CyclicBarrier 達成,全部線程將被釋放掉去繼續執行。

建立一個 CyclicBarrier

在建立一個 CyclicBarrier 的時候你須要定義有多少線程在被釋放以前等待柵欄。建立 CyclicBarrier 示例:

CyclicBarrier barrier = new CyclicBarrier(2);複製代碼

等待一個 CyclicBarrier

如下演示瞭如何讓一個線程等待一個 CyclicBarrier:
barrier.await();
固然,你也能夠爲等待線程設定一個超時時間。等待超過了超時時間以後,即使尚未達成 N 個線程等待 CyclicBarrier 的條件,該線程也會被釋放出來。如下是定義超時時間示例:
barrier.await(10,TimeUnit.SECONDS);

固然,知足如下條件也可讓等待 CyclicBarrier 的線程釋放:

  • 最後一個線程也到達 CyclicBarrier(調用 await()方法)
  • 當前線程被其餘線程打斷(其餘線程調用了這個線程的 interrupt()方法)
  • 其餘等待柵欄的線程被打斷
  • 其餘等待柵欄的線程因超時而被釋放
  • 外部線程調用了柵欄的 CyclicBarrier.reset()方法

CyclicBarrier 行動

CyclicBarrier 支持一個柵欄行動,柵欄行動是一個 Runnable 實例,一旦最後等待柵欄的線程抵達,該實例將被執行。你能夠在 CyclicBarrier 的構造方法中將 Runnable 柵欄行動傳給它:

CyclicBarrier barrier = new CyclicBarrier(2,barrierAction);

CyclicBarrier 示例代碼

public class CyclicBarrierExample {

    public static void main(String[] args) {
        Runnable barrier1Action = new Runnable() {
            public void run() {
                System.out.println("BarrierAction 1 executed ");
            }
        };
        Runnable barrier2Action = new Runnable() {
            public void run() {
                System.out.println("BarrierAction 2 executed ");
            }
        };

        CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
        CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);

        CyclicBarrierRunnable barrierRunnable1 =
                new CyclicBarrierRunnable(barrier1, barrier2);

        new Thread(barrierRunnable1).start();
        new Thread(barrierRunnable1).start();


    }

}

class CyclicBarrierRunnable implements Runnable {

    CyclicBarrier barrier1 = null;
    CyclicBarrier barrier2 = null;

    CyclicBarrierRunnable(
            CyclicBarrier barrier1,
            CyclicBarrier barrier2) {

        this.barrier1 = barrier1;
        this.barrier2 = barrier2;
    }

    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                    " waiting at barrier 1");
            this.barrier1.await();

            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                    " waiting at barrier 2");
            this.barrier2.await();

            System.out.println(Thread.currentThread().getName() +
                    " done!");

        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}複製代碼

思考一下程序的運行結果~

Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed 
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed 
Thread-0 done!
Thread-1 done!複製代碼

14.交換機 Exchanger

Exchanger 類表示一種兩個線程能夠進行互相交換對象的會和點。這種機制圖以下:

兩個線程經過一個 Exchanger 交換對象
兩個線程經過一個 Exchanger 交換對象

交換對象的動做由Exchanger 的兩個 exchange()方法中的其中一個完成。如下是一個示例:

public class ExchangerExample {

    public static void main(String[]args){
        Exchanger exchanger = new Exchanger();

        ExchangerRunnable exchangerRunnable1 =
                new ExchangerRunnable(exchanger, "Thread-0數據");

        ExchangerRunnable exchangerRunnable2 =
                new ExchangerRunnable(exchanger, "Thread-1數據");

        new Thread(exchangerRunnable1).start();
        new Thread(exchangerRunnable2).start();

    }

}
 class ExchangerRunnable implements Runnable{

    Exchanger exchanger = null;
    Object    object    = null;

    ExchangerRunnable(Exchanger exchanger, Object object) {
        this.exchanger = exchanger;
        this.object = object;
    }

    public void run() {
        try {
            Object previous = this.object;

            this.object = exchanger.exchange(this.object);

            System.out.println(
                    Thread.currentThread().getName() +
                            " exchanged " + previous + " for " + this.object
            );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}複製代碼

輸出結果:

Thread-1 exchanged Thread-1數據 for Thread-0數據
Thread-0 exchanged Thread-0數據 for Thread-1數據複製代碼

當一個線程到達exchange調用點時,若是它的夥伴線程此前已經調用了此方法,那麼它的夥伴會被調度喚醒並與之進行對象交換,而後各自返回。

在常見的 生產者-消費者 模型中用於同步數據。

15.信號量 Semaphore

Semaphore類是一個計數信號量。這就意味着它具有兩個主要方法:

  • acquire()得到
  • release()釋放

計數信號量由一個指定數量的「許可」初始化。每調用一次 acquire(),一個許可會被調用線程取走。沒調用一次 release(),一個許可會被還給信號量。所以,在沒有任何 release()調用時,最多有 N 個線程可以經過 acquire()方法,N 是該信號量初始化時的許可的指定數量。這些許可只是一個簡單的計數器。沒有啥奇特的地方。

Semaphore 用途

信號量主要有兩種用途:

1.保護一個重要(代碼)部分防止一次超過 N 個線程進入
2.在兩個線程之間發送信號

Semaphore 用法

若是你將信號量用於保護一個重要部分,試圖進入這一部分的代碼一般會首先嚐試得到一個許可,而後才能進入重要部分(代碼塊),執行完以後,再把許可釋放掉:

Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
//重要部分代碼塊
semaphore.release();複製代碼

在線程之間發生信號

若是你將一個信號量用於在兩個線程之間發送信號,一般你應該用一個線程調用 acquire()方法,而另外一個線程調用 release()方法。

若是沒有可用的許可,acquire()調用將會阻塞,知道一個許可被另外一個線程釋放出來。同理,若是沒法往信號量釋放更多許可時,一個 release()方法調用也會阻塞。

經過這個能夠對多個線程進行協調。好比,若是線程1將一個對象插入到了一個共享列表(list)以後調用了 acquire(),而線程2則從該列表中獲取一個對象以前調用了release(),這時你其實已經建立了一個阻塞隊列。信號量中可用的許可的數量也就等同於該則是隊列可以持有的元素個數。

公平

沒有辦法保證線程可以公平地從信號量中得到許可。也就是說,沒法擔保第一個調用 acquire()的線程會是第一個得到許可的線程。若是第一個線程在等待一個許可時發生阻塞,而第二個線程來索要一個許可的時候恰好有一個許可被釋放出來,那麼它就可能在第一個線程以前得到許可。
若是須要強制公平,Semaphore 類有一個具備一個布爾類型的參數的構造子,經過這個參數以告知 Semaphore 是否要強制公平。強制公平會影響到併發性能,因此除非你確實須要它,不然不要啓動它。

如下是如何在公平模式建立一個 Semaphore 的示例:

Semaphore semaphore = new Semaphore(1,ture);

更多方法

  • acquire()獲取一個許可
  • availablePermits()返回當前可用許可數
  • drainPermits()獲取並返回當即可用的全部許可
  • getQueueThreads()返回一個集合,包含可能等待獲取的數量
  • hasQueueThreads()返回正在等待獲取的線程的估計數目
  • isFair()若是此信號量的公平設置爲 true,則返回 true
  • reducePermits(int)根據指定的縮減量減少可用許可的數量
  • relaese()釋放一個許可

具體參考 JDK 文檔吧

使用案例

public class SemaphoreExample {

    public static void main(String[]args){
        Semaphore semaphore = new Semaphore(3);

        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();
        new Thread(new ThreadSemaphore(semaphore)).start();

    }

}

 class ThreadSemaphore implements Runnable{

    private final Semaphore semaphore;

     ThreadSemaphore(Semaphore semaphore){
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"獲取到鎖進來");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        semaphore.release();

    }
}複製代碼

打印結果是先打印三條線程,1秒後再打印兩條線程。

腦補一下,咱們當年搶第一代紅米手機的時候,搶購界面是否是一直在排隊,是否是用信號量能夠實現呢,10w 臺手機,同時只容許1000個用戶購買(Semaphore 的許可爲1000個),而後運氣好忽然排隊進去了(有人購買成功,釋放了許可,運氣好分配給了你),而後紅米搶購到手。。。

16.執行器服務 ExecutorService

ExecutorService 接口表示一個異步執行機制,使咱們可以在後臺執行任務。所以一個 ExecutorService 很相似一個線程池。實際上,存在於 concurrent 包裏的 ExecutorService 實現就是一個線程池實現。

ExecutorService 例子

如下是一個簡單的 ExecutorService 例子:

ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
executorService.shutdown();複製代碼

首先使用 newFixedThreadPool 工廠方法建立一個 ExecutorService。這裏建立了一個是個線程執行任務的線程池。
而後,將一個 Runnable 接口的匿名實現類傳給 execute()方法。這將致使 ExecutorService 中的某個線程執行該 Runnable。

任務委派

下圖說明了一個線程是如歌將一個任務委託給一個 ExecutorService 去異步執行的:

一個線程將一個任務委派給一個 ExecutorService 去異步執行
一個線程將一個任務委派給一個 ExecutorService 去異步執行

一旦該線程將任務委派給 ExecutorService,該線程將繼續它本身的執行,獨立於該任務的執行。

ExecutorService 實現

既然 ExecutorService 是個接口,若是你想用它的話,還得去使用它的實現類。

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

建立一個 ExecutorService

ExecutorService 的建立依賴於你使用的具體實現。可是你也可使用 Executors 工廠類來建立 ExecutorService 實例。如下是幾個建立 ExecutorService 實例的例子:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newScheduledThreadPool(10);複製代碼

ExecutorService 使用

有幾種不一樣的方式來將任務委託給 ExecutorService 去執行:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(Collection)
  • invokeAll(Collection)

接下來咱們來一個一個看這些方法

  • execute(Runnable)

execute 方法要求一個 Runnable 對象,而後對它進行異步執行。如下是使用 ExecutorService 執行一個 Runnable 的示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Asynchronous task");
    }
});
System.out.println("Asynchronous task");
executorService.shutdown();複製代碼

沒有辦法得知被執行的 Runnable 的執行結果。若是須要的話,得使用 Callable

  • submit(Runnable)

sunmit 方法也要求一個 Runnable 實現類,但它返回一個 Future 對象。這個 Future 對象能夠用來檢查 Runnable 是否已經執行完畢。

如下是 ExecutorService submit 示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});
future.get(); //returns null if the task has finished correctly.複製代碼
  • submit(Callable)
    submit(Callable)方法相似於 submit(Runnable)方法,除了它所要求的參數類型以外。Callable 實例除了它的 call()方法可以返回一個結果以外和一個 Runnable 很像。Runnable.run()不能返回一個結果。

Callable 的結果能夠經過 submit(Callable)方法返回的 Future 對象進行獲取。如下是一個 ExecutorService Callable 示例:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Object> future = executorService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
        }
    });
System.out.println("future.get() = " + future.get());複製代碼

以上代碼輸出:

Asynchronous Callable
future.get() = Callable Result複製代碼

注意:future.get()會阻塞線程直到 Callable 執行結束。你能夠把這個當成是一個有返回值的線程。

  • invokeAny()

invokeAny()方法要求一系列的 Callable 或者其子接口的實例對象。調用這個方法並不會返回一個 Future,但它返回其中一個 Callable 對象的結果。沒法保證返回的是哪一個 Callable 的結果,只能代表其中一個已經執行結束。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> set = new HashSet<>();
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});

set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});
String result = executorService.invokeAny(set);
System.out.println("result = " + result);
executorService.shutdown();複製代碼

執行結果就不看了,自行測試吧

ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> set = new HashSet<>();
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});

set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
set.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});
List<Future<String>> list = executorService.invokeAll(set);
for (Future<String> future : list)
    System.out.println("result = " + future.get());
executorService.shutdown();複製代碼

執行結果自行測試。。。

ExecutorService 關閉

使用完 ExecutorService 以後,應該將其關閉,以使其中的線程再也不容許。
好比,若是你的應用是經過一個 main 方法啓動的,以後 main 方法退出了你的應用,若是你的應用有一個活動的 ExecutorService,它將還會保持運行。ExecutorService 裏的活動線程阻止了 JVM 的關閉。
要終止 ExecutorService 裏的線程,你須要調用 ExecutorService 的 shutdown 方法。

ExecutorService 並不會當即關閉,但它將再也不接受新的任務,並且一旦全部線程都完成了當前任務的時候,ExecutorService 將會關閉。在 shutdown 被調用以前全部提交給ExecutorService 的任務都被執行。
若是你想當即關閉 ExecutorService,你能夠調用 shutdownNow 方法,這樣會當即嘗試中止全部執行中的任務,並忽略掉那些已提交但還沒有開始處理的任務。沒法保證執行任務的正確執行。可能它們被中止了,也可能已經執行結束。

17.線程池執行者 ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService 接口的一個實現。
ThreadPoolExecutor 使用其內部池中的線程執行給定任務(Callable 或者 Runnable)。ThreadPoolExecutor 包含的線程池可以包含不一樣數量的線程,池中線程的數量由如下變量決定:

  • corePoolSize
  • maximumPoolSize
    當一個任務委託給線程池時,若是池中線程數量低於 corePoolSize,一個新的線程將被建立,即便池中可能還沒有有空閒線程。
    若是內部任務隊列已滿,並且有至少 corePoolSize 正在運行,可是運行線程的數量低於 maximumPoolSize,一個新的線程將被建立去執行該任務。

ThreadPoolExecutor 圖解:

一個ThreadPoolExecutor
一個ThreadPoolExecutor

建立一個 ThreadPoolExecutor

ThreadPoolExecutor 有若干個可用構造方法。好比:

int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());複製代碼

可是,除非你確實須要顯式爲 ThreadPoolExecutor 定義全部參數,使用 Executors 類中的額工廠方法之一會更加方便。

18.定時執行者服務 ScheduleExecutorService

ScheduleExecutorService 是一個 ExecutorService,它可以將任務延後執行,或者間隔固定時間屢次執行。任務由一個工做者線程異步執行,而不是由提交任務給 ScheduleExecutorService 的那個線程執行。

ScheduleExecutorService 例子

如下是一個簡單的 ScheduleExecutorService 示例:

ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);
scheduledExecutorService.schedule(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
}, 5, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();複製代碼

首先一個內置5個線程的 ScheduleExecutorService 被建立,以後一個 Callable 接口的匿名類示例被建立而後傳遞給 schedule()方法。後邊的兩參數定義了 Callable 將在5秒鐘以後被執行。

ScheduleExecutorService 實現

既然是一個接口,要使用它的話就得使用 concurrent 包下的實現類

  • ScheduleThreadPoolExecutor

建立一個 ScheduleExecutorService

如何建立一個 ScheduleExecutorService,取決於你採用它的實現類。可是你也可使用 Executors 工廠類來建立一個 ScheduleExecutorService 實例。好比:

ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);複製代碼

ScheduleExecutorService 使用

一旦你建立了一個 ScheduleExecutorService,你能夠經過調用它的如下方法:

  • shcedule(Callable task,long delay,TimeUnit timeunit)
  • shcedule(Runnable task,long delay,TimeUnit timeunit)
  • shceduleAtFixedRate(Runnable task,long initialDelay,long period,TimeUtil timeutil)
  • shceduleWithFixedDelay(Runnable task,long initialDelay,long period,TimeUtil timeutil)

下面咱們就簡單看一下這些方法。

  • schedule(Callable task,long delay,TimeUnit timeUnit)
    這個方法計劃指定的 Callable 在給定的延遲以後執行。
    這個方法返回一個 ScheduleFuture,經過它你能夠在它被執行前對它進行取消,或者在它執行以後獲取結果。

如下是一個示例:

ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(5);
ScheduledFuture<Object> schedule = scheduledExecutorService.schedule(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
}, 5, TimeUnit.SECONDS);
System.out.println(schedule.get());
scheduledExecutorService.shutdown();複製代碼

輸出結果:

Executed!
Called!複製代碼
  • shcedule(Runnable task,long delay,TimeUnit timeUnit)

除了 Runnable 沒法返回一個結果以外,這一方法工做起來就像一個 Callable 做爲一個參數的那個版本的方法同樣,所以 ScheduleFuture.get()在任務執行結束以後返回 null。

  • scheduleAtFixedRate(Runnable,long initialDelay,long period,TimeUnit tomeUnit)
    這一方法規劃一個任務將被按期執行。該任務將會在某個 initialDelay 以後獲得執行,而後每一個 period 時間以後重複執行。
    若是給的任務的執行拋出了異常,該任務將再也不執行。若是沒有任何異常的話,這個任務將會持續循環執行到 ScheduleExecutorService 被關閉。
    若是一個任務佔用了比計劃的時間間隔更長的時候,下一次執行將在當前執行結束執行纔開始。計劃任務在同一時間不會有多個線程同時執行。

  • scheduleWithFixedDelay(Runnable r,long initalDelay,long period,TimeUnit timeUnit)

除了 period 有不一樣的解釋以外這個方法和 scheduleAtFixedRate()很是像。
scheduleAtFixedRate()方法中,period 被解釋爲前一個執行的開始和下一個執行的開始之間的間隔時間。
而在本方法中,period 則被解釋爲前一個執行的結束和下一個執行開始之間的間隔。

ShceduleExecutorService 關閉

正如 ExecutorService,在你使用結束以後,你須要吧 ScheduleExecutorService 關閉掉。不然他將致使 JVM 繼續運行,即便全部其餘線程已經所有被關閉。
你能夠從 ExecutorService 接口繼承來的 shutdown()或 shutdownNow()方法將 ScheduleExecutorService 關閉。

19.使用 ForkJoinPool 進行分叉和合並

ForkJoinPool 在 Java7中被引入。它和 ExecutorService 很類似,除了一點不一樣。
ForkJoinPool 讓咱們能夠很方便把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務能夠繼續分割成更小的子任務,只要它還能分割。可能聽起來有點抽象,所以本節中咱們將會解釋 ForkJoinPool 是如何工做的,還有任務分割是如何進行的。

分叉和合並解釋

在咱們開始看 ForkJoinPool 以前,咱們先來簡要解釋一下分叉和合並的原理。
分叉和合並原理包含兩個遞歸進行的步驟。兩個步驟分別是分叉步驟和合並步驟。

  • 分叉

一個使用了分叉和合並原理的任務能夠將本身分叉(分割)爲更小的子任務,這些子任務能夠被併發執行。以下圖所示:

分叉
分叉

經過把本身分割成多個子任務,每一個子任務能夠由不一樣的 CPU 併發執行,或者被同一個 CPU 上的不一樣線程執行。

只有當給的任務過大,把它分割成幾個子任務纔有意義。把任務分割成子任務有一點的開銷,所以對於小型任務,這個分割的消耗可能比每一個子任務併發執行的消耗還要大。

何時把一個任務分割成子任務是有意義的,這個界限也稱做一個閾值。折腰看每一個任務對有意義閾值的決定。很大程度取決於它要作的工做的種類。

  • 合併

當一個任務將本身分割成若干子任務以後,該任務將進入等待全部子任務的結束之中。
一旦子任務執行結束,該任務能夠把全部結果合併到同一結果。圖示以下:

合併
合併

固然,並不是全部類型的任務都會返回一個結果。若是這個任務並不返還一個結果,它只需等待全部子線程執行完畢。也就不須要結果合併。

ForkJoinPool

ForkJoinPool 是一個特殊的線程池,她的設計是爲了更好的配合 分叉-合併 任務分割的工做。ForkJoinPool 也在 concurrent 包中。

能夠經過其構造方法建立一個 ForkJoinPool。 ForkJoinPool 構造函數的參數定義了 ForkJoinPool 的並行級別,並行級別表示分叉的線程或 CPU 數量。

建立示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

提交任務到 ForkJoinPool

就像提交任務到 ExecutorService那樣,把任務提交到 ForkJoinPool。你能夠提交兩種類型的任務。一種是沒有任何返回值的,另外一種是有返回值的。這兩週任務分別有 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種類型的任務,以及如何對它們進行提交。

RecursiveAction

RecursiveAction 是一種沒有返回值的任務。它只是作一些工做,好比寫數據到磁盤,而後就退出了。
一個 RecursiveAction 能夠把本身的工做分割成更小的幾塊,這樣它們能夠由獨立的線程或者 CPU 執行。
你能夠經過集成來實現一個 RecursiveAction。示例以下:

public class RecursiveActionExample {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(40);
        MyRecursiveAction myRecursiveAction = new MyRecursiveAction(240);
        forkJoinPool.invoke(myRecursiveAction);

    }
}

class MyRecursiveAction extends RecursiveAction {
    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {
        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
            List<MyRecursiveAction> subtasks =
                    new ArrayList<>();
            subtasks.addAll(createSubtasks());
            for (RecursiveAction subtask : subtasks) {
                subtask.fork();
            }
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
                new ArrayList<>();
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);
        return subtasks;
    }
}複製代碼

例子跟簡單。MyRecursiveAction 將一個虛構的 workLoad 做爲參數傳給本身的構造方法。若是 wrokLoad 高於一個特定的閾值,該工做將分割爲幾個子工做,子工做繼續分割。若是 workLoad 高於一個特定閾值,該工做將被分割爲幾個子工做,子工做繼續分割。若是 workLoad 低於特定閾值,該工做將有 MyRecursiveAction 本身執行。

運行結果:

Splitting workLoad : 240
Splitting workLoad : 120
Splitting workLoad : 120
Splitting workLoad : 60
Splitting workLoad : 60
Splitting workLoad : 60
Splitting workLoad : 30
Splitting workLoad : 30
Splitting workLoad : 30
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15複製代碼

RecursiveTask

RecursiveTask 是一種會返回結果的任務。它能夠將本身的工做分割爲若干更小任務,並將這些子任務的執行結果合併到一個集體結果。能夠有幾個水平的分割和合並。如下是一個 RecursiveTask 示例:

public class RecursiveTaskExample {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(40);
        MyRecursiveTask myRecursiveAction = new MyRecursiveTask(240);
        Object invoke = forkJoinPool.invoke(myRecursiveAction);
        System.out.println("mergedResult = " + invoke);
    }
}

class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {

        //if work is above threshold, break tasks up into smaller tasks
        if (this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            List<MyRecursiveTask> subtasks =
                    new ArrayList<>();
            subtasks.addAll(createSubtasks());

            for (MyRecursiveTask subtask : subtasks) {
                subtask.fork();
            }

            long result = 0;
            for (MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;

        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
                new ArrayList<>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }
}複製代碼

注意是如何經過 ForkJoinPool.invoke()方法的調用來獲取最終執行結果的。

運行結果:

Splitting workLoad : 240
Splitting workLoad : 120
Splitting workLoad : 120
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15
Splitting workLoad : 60
Splitting workLoad : 30
Doing workLoad myself: 15
Splitting workLoad : 30
Splitting workLoad : 60
Splitting workLoad : 30
Splitting workLoad : 60
Doing workLoad myself: 15
Splitting workLoad : 30
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
Splitting workLoad : 30
Doing workLoad myself: 15
Doing workLoad myself: 15
Doing workLoad myself: 15
mergedResult = 720複製代碼

ForkJoinPool 評論

貌似並不是每一個人都對 Java7裏面的 ForkJoinPool 滿意,也就是說,這裏面會有坑,在你計劃在本身的項目裏使用 ForkJoinPool 以前最好閱讀一下這篇文章《一個 Java 分叉-合併 帶來的災禍》

haha...文章是英文版本的,能夠用瀏覽器插件翻譯,或者自行百度吧。

20.鎖 Lock

Lock 是一個相似於 Synchronized 塊的線程同步機制。可是 Lock 比 Synchronized 塊更加靈活、精細。

Lock 例子

既然 Lock 是一個接口,在程序中總須要使用它的實現類之一來使用它。如下是一個簡單示例:

Lock lock = new ReentrantLock();
lock.lock();
//同步代碼
lock.unLock();複製代碼

首先建立了一個 Lock 對象。以後調用了它的 lock()方法。這時候這個 lock 實例就被鎖住啦。任何其餘再過來調用 lock()方法的線程將會被鎖阻塞住,直到鎖定 lock 線程的實例的線程調用了 unlock()方法。最後 unlock()被調用了,lock 對象解鎖了,其餘線程能夠對它進行鎖定了。

Lock 實現

concurrent 包下 Lock 的實現類以下:

  • ReentrantLock

Lock 和 Synchronized 代碼塊的主要不一樣點

  • Synchronized 代碼塊不可以保證進入訪問等待的線程的前後順序
  • 你不能傳遞任何參數給一個 Synchronized 代碼塊的入口。所以,對於 Synchronized 代碼塊的訪問等待設置超時時間是不可能的事情。
  • Synchronized 塊必須被完整地包含在單個方法裏。而一個 Lock 對象能夠把它的 lock()和 unLock()方法的調用放在不一樣的方法裏。

Lock 的方法

Lock 接口主要有如下幾個方法

  • lock()
  • lockInterruptibly()
  • tryLock()
  • tryLock(long timeout,TimeUnit unit)
  • unLock()

lock()將 Lock 實例鎖定。若是該 Lock 實例已被鎖定,調用lock()方法的線程將會被阻塞,直到 Lock 實例解鎖。
lockInterruptibly()方法將會被調用線程鎖定,除非該線程將被打斷。此外,若是一個線程在經過這個方法來鎖定 Lock 對象時進入阻塞等待,而它被打斷了的話,該線程將會退出這個方法調用。
tryLock()方法視圖當即鎖定 Lock 實例。若是鎖定成功,它將返回 true,若是 Lock 實例已經被鎖定,則返回 false。這一方法用不阻塞。
tryLock(long timeout,TimeUnit unit)的工做相似於 tryLock()方法,除了它在放棄鎖定 Lock 以前等待一個給定的超時時間以外。
unlock()方法對 Lock 實例解鎖。一個 Lock 實現將只容許鎖定了該對象的線程來調用此方法。其餘線程對 unlock()方法調用將會拋出異常。

21.讀寫鎖 ReadWriteLock

讀寫鎖是一種先進的線程鎖機制。它可以容許多個線程在同一時間對某特定資源進行讀取,但同一時間內只能有一個線程對其進行寫入。
讀寫鎖的理念在於多個線程可以對一個共享資源進行讀取,而不會致使併發問題。併發問題的發生場景在於對一個共享資源的讀和寫操做的同時進行,或者多個讀寫操做併發進行。

ReadWrite Lock 鎖規則

一個線程在對受保護資源在讀或者寫以前對 ReadWriteLock 鎖定的規則以下:

  • 讀鎖:若是沒有任何寫操做線程鎖定 ReadWriteLock,而且沒有任何寫操做線程要求一個寫鎖(但尚未得到該鎖)。所以,能夠有多個讀操做線程對該鎖進行鎖定。
  • 寫鎖:若是沒有任何讀操做或者寫操做。所以,在寫操做的時候,只能有一個線程對該鎖進行鎖定。

ReadWriteLock 實現

ReadWriteLock 是個接口,若是你想使用它的話就得去使用它的實現類之一。concurrent 包提供了一個實現類:

  • ReentrantReadWriteLock

ReadWriteLock 代碼示例
如下是 ReadWriteLock 的建立以及如何使用它進行讀、寫鎖定的簡單示例代碼:

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.readLock().locl();
//乾點事情
readWriteLock.readLock().unlock();

ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readWriteLock.writeLock().locl();
//乾點事情
readWriteLock.writeLock().unlock();複製代碼

注意如何使用 ReadWriteLock 對兩種鎖示例的持有。一個對讀訪問進行保護,一個對寫訪問進行保護。

固然,這裏的「讀寫」你能夠根據需求靈活變化。

22.原子性布爾 AtomicBoolean

AtomicBoolean 類爲咱們提供了一個能夠用原子方式進行讀和諧的布爾值,它還擁有一些先進的原子性操做,好比 compareAndSet()。AtomicBoolean 類位於 concurrent.atomic 包。

建立一個 AtomicBoolean

你能夠這樣建立一個 AtomicBoolean。
AtomicBoolean atomicBoolean = new AtomicBoolean();
以上示例新建了一個默認值爲 false 的 AtomicBoolean。
若是你想要爲 AtomicBoolean 示例設置一個顯示的初始值,那麼你能夠將初始值傳給 AtomicBoolean 的構造參數。
AtomicBoolean atomicBoolean = new AtomicBoolean(true);

獲取 AtomicBoolean 的值

你能夠經過使用 get()方法來獲取一個 AtomicBoolean 的值。示例以下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean value = atomicBoolean.get();複製代碼

以上代碼執行後 value 的值爲 true。

設置 AtomicBoolean 的值

你能夠經過 set() 方法來設置 AtomicBoolean 的值:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
atomicBoolean.set(false);複製代碼

交互 AtomicBoolean 的值

你能夠經過 getAndSet()方法來交換一個 AtomicBoolean 實例的值。getAndSet()方法將返回 AtomicBoolean 當前的值,並將爲 AtomicBoolean 設置一個新值,示例以下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean oldValue = atomicBoolean.getAndSet(false);複製代碼

以上代碼執行後 oldValue 變量的值爲 true,atomicBoolean 實例將持有 false 值,代碼成功將 AtomicBoolean 當前值 true 交換爲 false。

比較並設置 AtomicBoolean 的值

compareAndSet()方法容許你對 AtomicBoolean 的當前值與與一個指望值進行比較,若是當前值等於指望值的話,將會對 AtomicBoolean 設定一個新值。compareAndSet()方法是原子性質的,所以在同一時間以內有耽擱線程執行她。所以 compareAndSet()方法可被用於一些相似於鎖的同步的簡單實現。
如下是一個 compareAndSet()示例:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
boolean expectedValue = true;
boolean newVaule = false;
boolean wasNewValueSet = atomicBoolean.compareAndSet(expectedValue,newValue);複製代碼

本示例針對 AtomicBoolean 的當前值與 true 值進行比較,若是相等,將 AtomicBoolean 的值更新爲 false。

有什麼用?

可能有些小夥伴到這裏仍是有點懵逼,這個原子布爾到底有什麼用,給你們看一個示例代碼:

class XxxService {

    private static AtomicBoolean initState = new AtomicBoolean(false);
    private static AtomicBoolean initFinish = new AtomicBoolean(false);
    private static XxxService instance;

    private XxxService() {
    }

    public static XxxService getInstance() {
        if (initState.compareAndSet(false, true)) {
            //TODO 寫初始化代碼
            initFinish.set(true);
        }
        while(!initFinish.get()){
            Thread.yield();
        }

        return instance;
    }
}複製代碼

假如程序須要在多線程的狀況下初始化一個類,而且保證只初始化一次,完美解決併發問題。

23.原子性整形 AtomicIngteger

同22,略

24.原子性長整型 AtomicBooleanLong

同22,略

25.原子性引用型 AtomicReference

AtomicReference 提供了一個能夠被原子性讀和寫的對象引用變量。原子性的意思是多個想要改變同一個 AtomicReference 的線程不會致使 AtomicReference 處於不一致的狀態。AtomicReference 還有一個 compareAndSet()方法,經過它你能夠將當前引用於一個指望值(引用)進行比較,若是相等,在該 AtomicReference 對象內部設置一個新的引用。

建立一個 AtomicReference

建立 AtomicReference 以下:
AtomicReference atomicReference = new AtomicReference();
若是你須要使用一個指定引用建立 AtomicReference,能夠:
String initialReference = "the initialyl reference string"; AtomicReference atomicReference = new AtomicReference(initialReference);

建立泛型 AtomicReference

你可使用 Java 泛型來建立一個泛型 AtomicReference。示例:
AtomicReference<String> atomicReference = new AtomicReference();
你也能夠爲泛型 AtomicReference 設置一個初始值。示例:
String initialReference = "the initialyl reference string"; AtomicReference<String> atomicReference = new AtomicReference<>(initialReference);

獲取 AtomicReference 引用

你能夠經過 AtomicReference 的 get()方法來獲取保存在 AtomicReference 裏的引用.

設置 AtomicReference 引用

AtomicReference.set(V newValue);

比較並設置 AtomicReference 引用

使用 compareAndSet()

和 volatile 關鍵字的區別

敲黑板!!!

Atomic 和 volatile的區別很簡單,Atomic 保證讀寫操做同步,可是 volatile 只保證寫的操做,並無保證讀的操做同步。

具體原理牽涉到虛擬機的層次了,感興趣的小夥伴可自行學習。

參考資料

本文主要參考了Java併發工具包java.util.concurrent用戶指南中英文對照閱讀版.pdf, 點擊可下載資源。

相關文章
相關標籤/搜索