這部分用的很少,僅簡單記錄一下新特性api
Java 8 增長了兩個新方法 updateAndGet 和 accumulateAndGet 用於更新原子類的值,取代老API的循環方式。數組
AtomicInteger ai = new AtomicInteger(); // 多線程環境下安全更新 Integer i = ai.updateAndGet(x -> 5); System.out.println(i); // 輸出 5 // 多線程環境下安全更新 將原子值和傳入的參數組合 ai.accumulateAndGet(1, (oldValue, paramValue) -> oldValue + paramValue); System.out.println(ai.get()); // 輸出 6
若是程序內有高度的競爭,大量的線程訪問同一個原子值,可使用 LongAdder 和 LongAccumulator,這個類是 Java 8 提供用於在高度競爭環境下替代 AtomicLong 的。安全
LongAdder adder = new LongAdder(); adder.add(2); System.out.println(adder.intValue()); // 輸出 2 adder.increment(); System.out.println(adder.sum()); // 輸出 3 sum方法返回long型 // 同 accumulateAndGet 方法, 將原子值和傳入的參數組合 LongAccumulator la = new LongAccumulator((left, right) -> left + right, 10); System.out.println(la.intValue()); // 輸出 10
額外提一點,在 1.5 版本, ConcurrentHashMap 存在死鎖的可能(具體源碼就再也不分析了),1.6 版本之後修復了這個問題。多線程
一個誤區: 對關鍵字計數函數
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); Integer oldValue = map.get("key"); Integer newValue = oldValue == null ? 1 : oldValue + 1; map.put("key", newValue);
這段代碼並非線程安全的,有可能發生多個線程更新的值相等。由於 ConcurrentHashMap 的設計目的是保護內部結構不被破壞,任何 get 、put 操做不會致使內部鏈表節點丟失或造成迴路,而不是維護操做順序的原子性。測試
正確的更新方法,CAS方式循環更新:線程
do { oldValue = map.get("key"); newValue = oldValue == null ? 1 : oldValue + 1; } while (!map.replace("key", oldValue, newValue));
Java 8 中也能夠這樣設計
ConcurrentHashMap<String, LongAdder> map = new ConcurrentHashMap<>(); map.putIfAbsent("key", new LongAdder()).increment();
Java 8 爲 ConcurrentHashMap 提供了批量數據操做, 即便其它線程同時操做時也能夠安全的執行。 批量數據操做有三類:code
每類操做都有對鍵、值、鍵和值、Map.Entry對象操做的4個版本,列舉一下。對象
在使用這幾個操做的時候,須要指定一個 並行閥值。若是 map.size 大於閥值,批量操做就以並行的方式執行。若是不想並行執行,可使用 Long.MAX_VALUE做爲閥值;若是想盡量的多線程執行,能夠用1做爲閥值。
很簡單,看 一眼 API 就能明白,以一個方法api爲例: U search(long threshold, BiFunction<? super K, ? super V, ? extends U> f)
forEach 方法有兩種形式,第一種就是咱們經常使用的,參數爲消費者,只不過這裏參數多了一個閥值,
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.forEach(3, (k, v) -> System.out.println(k +"-" + v));
第二中又多了一個參數,是一個轉換其函數,轉換後的結果傳遞給消費者函數,若是轉換後的結果爲 null,值會被跳過。
map.forEach(3, (k, v) -> k +"-"+v, // 轉換器 System.out::print); // 消費者
用法見前面章節介紹,這裏也是多了一個閥值參數。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); long threshold = 3; map.reduceValues(threshold, (s, s2) -> s + s2);
同 forEach 同樣,你也能夠提供一個轉換器函數。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); long threshold = 3; map.reduceValues(threshold, v -> v == null ? null: v + " ", // 轉換器 (s, s2) -> s + s2); // 聚合方法
多是由於設計者想更明確的表達並行排序,因此並無修改原 sort方法,而是新增長了 parallelSort 和 parallel* 等方法。最簡單的並行排序以下,跟 sort 沒什麼區別。
String[] arr = {"an", "ba", "daf", "ads", "ca"}; Arrays.parallelSort(arr);
這個比較有意思,它的參數是一個位置索引,因此能構造跟位置有關的數組值,
String[] arr = {"an", "ba", "daf", "ads", "ca"}; Arrays.parallelSetAll(arr, i -> arr[i] + i); for (String s : arr) { System.out.println(s); } // 輸出結果是: an0 ba1 daf2 ads3 ca4
相似reduce, 對數組內元素進行聚合
String[] arr = {"an", "ba", "daf", "ads", "ca"}; Arrays.parallelPrefix(arr, (x, y) -> x+"+"+y); for (String s : arr) { System.out.println(s); } // 輸出結果 an an+ba an+ba+daf an+ba+daf+ads an+ba+daf+ads+ca
相對於老的 Future 增長了相似 Stream API 流水操做的功能,使同一個線程執行的代碼能寫在一塊兒,雖然他們不是連續執行的。
CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef"); System.out.println("111"); // 同一線程執行的代碼寫在一塊兒 CompletableFuture<String> c2 = contents.thenApplyAsync(x -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("333"); return x; }); c2.thenAcceptAsync(x -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(x); }); // 其餘的業務在 Future 執行以前執行 System.out.println("222"); // 防止測試程序結束 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } // 輸出順序是: 111 222 333 abcdef
上面的例子爲了順序清晰,加了一些噁心的代碼,精簡一下應該是這樣
CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef"); // 同一線程執行的代碼寫在一塊兒 contents.thenApplyAsync(x -> x).thenAcceptAsync(System.out::println); // 其餘的業務在 Future 執行以前執行 System.out.println("222");
CompletableFuture 流式操做還有幾個實用的方法,就不寫demo了,須要用的時候再查API。