四、併發加強

4.1 概述

這部分用的很少,僅簡單記錄一下新特性api

4.2 原子值

4.2.1 更新方法

Java 8 增長了兩個新方法 updateAndGet 和 accumulateAndGet 用於更新原子類的值,取代老API的循環方式。數組

AtomicInteger ai = new AtomicInteger();

// 多線程環境下安全更新
Integer i = ai.updateAndGet(x -> 5);
System.out.println(i);   // 輸出 5

// 多線程環境下安全更新  將原子值和傳入的參數組合
ai.accumulateAndGet(1, (oldValue, paramValue) -> oldValue + paramValue);
System.out.println(ai.get());   // 輸出 6

4.2.2 LongAdder

若是程序內有高度的競爭,大量的線程訪問同一個原子值,可使用 LongAdder 和 LongAccumulator,這個類是 Java 8 提供用於在高度競爭環境下替代 AtomicLong 的。安全

LongAdder adder = new LongAdder();
adder.add(2);
System.out.println(adder.intValue());   // 輸出 2

adder.increment();
System.out.println(adder.sum());  // 輸出 3  sum方法返回long型

// 同 accumulateAndGet 方法, 將原子值和傳入的參數組合
LongAccumulator la = new LongAccumulator((left, right) -> left + right, 10);
System.out.println(la.intValue());  // 輸出 10

4.3 ConcurrentHashMap

額外提一點,在 1.5 版本, ConcurrentHashMap 存在死鎖的可能(具體源碼就再也不分析了),1.6 版本之後修復了這個問題。多線程

4.3.1 更新值

一個誤區: 對關鍵字計數函數

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Integer oldValue = map.get("key");
Integer newValue = oldValue == null ? 1 : oldValue + 1;
map.put("key", newValue);

這段代碼並非線程安全的,有可能發生多個線程更新的值相等。由於 ConcurrentHashMap 的設計目的是保護內部結構不被破壞,任何 get 、put 操做不會致使內部鏈表節點丟失或造成迴路,而不是維護操做順序的原子性。測試

正確的更新方法,CAS方式循環更新:線程

do {
    oldValue = map.get("key");
    newValue = oldValue == null ? 1 : oldValue + 1;
} while (!map.replace("key", oldValue, newValue));

Java 8 中也能夠這樣設計

ConcurrentHashMap<String, LongAdder> map = new ConcurrentHashMap<>();
map.putIfAbsent("key", new LongAdder()).increment();

4.3.2 批量數據操做

Java 8 爲 ConcurrentHashMap 提供了批量數據操做, 即便其它線程同時操做時也能夠安全的執行。 批量數據操做有三類:code

  • search:對全部的鍵和(或)值應用一個函數,直到函數返回一個非 null 的結果。
  • reduce:經過提供的聚合函數,將全部的鍵和(或)值組合起來。
  • forEach:對全部的鍵和(或)值應用一個函數,注意這個與 Map 中的 forEach 方法不同。

每類操做都有對鍵、值、鍵和值、Map.Entry對象操做的4個版本,列舉一下。對象

  • searchKeys / reduceKeys / forEachKey
  • searchValues / reduceValues / forEachValues
  • search / reduce / forEach
  • searchEntries / reduceEntries / forEachEntry

在使用這幾個操做的時候,須要指定一個 並行閥值。若是 map.size 大於閥值,批量操做就以並行的方式執行。若是不想並行執行,可使用 Long.MAX_VALUE做爲閥值;若是想盡量的多線程執行,能夠用1做爲閥值。

search

很簡單,看 一眼 API 就能明白,以一個方法api爲例: U search(long threshold, BiFunction<? super K, ? super V, ? extends U> f)

forEach

forEach 方法有兩種形式,第一種就是咱們經常使用的,參數爲消費者,只不過這裏參數多了一個閥值,

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.forEach(3, (k, v) -> System.out.println(k +"-" + v));

第二中又多了一個參數,是一個轉換其函數,轉換後的結果傳遞給消費者函數,若是轉換後的結果爲 null,值會被跳過。

map.forEach(3,
        (k, v) -> k +"-"+v,       // 轉換器
        System.out::print);       // 消費者

reduce

用法見前面章節介紹,這裏也是多了一個閥值參數。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
long threshold = 3;
map.reduceValues(threshold, (s, s2) -> s + s2);

同 forEach 同樣,你也能夠提供一個轉換器函數。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
long threshold = 3;
map.reduceValues(threshold,
        v -> v == null ? null: v + " ",  // 轉換器
        (s, s2) -> s + s2);              // 聚合方法

4.4 並行數組

4.4.1 sort

多是由於設計者想更明確的表達並行排序,因此並無修改原 sort方法,而是新增長了 parallelSort 和 parallel* 等方法。最簡單的並行排序以下,跟 sort 沒什麼區別。

String[] arr = {"an", "ba", "daf", "ads", "ca"};
Arrays.parallelSort(arr);

4.4.2 parallelSetAll

這個比較有意思,它的參數是一個位置索引,因此能構造跟位置有關的數組值,

String[] arr = {"an", "ba", "daf", "ads", "ca"};

Arrays.parallelSetAll(arr, i -> arr[i] + i);

for (String s : arr) {
    System.out.println(s);
}

// 輸出結果是:
an0
ba1
daf2
ads3
ca4

4.4.3 parallelPrefix

相似reduce, 對數組內元素進行聚合

String[] arr = {"an", "ba", "daf", "ads", "ca"};

Arrays.parallelPrefix(arr, (x, y) -> x+"+"+y);

for (String s : arr) {
    System.out.println(s);
}
// 輸出結果
an
an+ba
an+ba+daf
an+ba+daf+ads
an+ba+daf+ads+ca

4.5 CompletableFuture

相對於老的 Future 增長了相似 Stream API 流水操做的功能,使同一個線程執行的代碼能寫在一塊兒,雖然他們不是連續執行的。

CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef");

System.out.println("111");

// 同一線程執行的代碼寫在一塊兒
CompletableFuture<String> c2 = contents.thenApplyAsync(x -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("333");
    return x;
});

c2.thenAcceptAsync(x -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println(x);
});

// 其餘的業務在 Future 執行以前執行
System.out.println("222");

// 防止測試程序結束
try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

// 輸出順序是:
111
222
333
abcdef

上面的例子爲了順序清晰,加了一些噁心的代碼,精簡一下應該是這樣

CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef");

// 同一線程執行的代碼寫在一塊兒
contents.thenApplyAsync(x -> x).thenAcceptAsync(System.out::println);

// 其餘的業務在 Future 執行以前執行
System.out.println("222");

CompletableFuture 流式操做還有幾個實用的方法,就不寫demo了,須要用的時候再查API。

相關文章
相關標籤/搜索