原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap
java.concurrent.atomic
包下有不少原子操做的類。 在有些狀況下,原子操做能夠在不使用 synchronized
關鍵字和鎖的狀況下解決多線程安全問題。html
在內部,原子類大量使用 CAS
, 這是大多數如今 CPU 支持的原子操做指令, 這些指令一般狀況下比鎖同步要快得多。若是須要同時改變一個變量, 使用原子類是極其優雅的。java
如今選擇一個原子類 AtomicInteger
做爲例子web
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(atomicInt::incrementAndGet)); stop(executor); System.out.println(atomicInt.get()); // => 1000
使用 AtomicInteger
代替 Integer
能夠在線程安全的環境中增長變量, 而不要同步訪問變量。incrementAndGet()
方法是一個原子操做, 咱們能夠在多線程中安全的調用。編程
AtomicInteger
支持多種的原子操做, updateAndGet()
方法接受一個 lambda
表達式,以便對整數作任何的算術運算。api
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.updateAndGet(n -> n + 2); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 2000
accumulateAndGet()
方法接受一個 IntBinaryOperator
類型的另外一種 lambda
表達式, 咱們是用這種方法來計算 1 -- 999 的和:安全
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500
還有一些其餘的原子操做類: AtomicBoolean AtomicLong AtomicReference服務器
做爲 AtomicLong
的替代, LongAdder
類能夠用來連續地向數字添加值。多線程
ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(adder::increment)); stop(executor); System.out.println(adder.sumThenReset()); // => 1000
LongAdder
類和其餘的整數原子操做類同樣提供了 add()
和 increment()
方法, 同時也是線程安全的。但其內部的結果不是一個單一的值, 這個類的內部維護了一組變量來減小多線程的爭用。實際結果能夠經過調用 sum()
和 sumThenReset()
來獲取。併發
當來自多線程的更新比讀取更頻繁時, 這個類每每優於其餘的原子類。一般做爲統計數據, 好比要統計 web 服務器的請求數量。 LongAdder
的缺點是會消耗更多的內存, 由於有一組變量保存在內存中。oracle
LongAccumulator
是 LongAdder
的一個更通用的版本。它不是執行簡單的添加操做, 類 LongAccumulator
圍繞 LongBinaryOperator
類型的lambda表達式構建,如代碼示例中所示:
LongBinaryOperator op = (x, y) -> 2 * x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539
咱們使用函數 2 * x + y
和初始值1
建立一個 LongAccumulator
。 每次調用 accumulate(i)
, 當前結果和值i
都做爲參數傳遞給`lambda
表達式。
像 LongAdder
同樣, LongAccumulator
在內部維護一組變量以減小對線程的爭用。
ConcurrentMap
接口擴展了 Map
接口,並定義了最有用的併發集合類型之一。 Java 8
經過向此接口添加新方法引入了函數式編程。
在下面的代碼片斷中, 來演示這些新的方法:
ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
forEach()
接受一個類型爲 BiConsumer
的 lambda
表達式, 並將 map
的 key
和 value
做爲參數傳遞。
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
putIfAbsent()
方法只有當給定的 key
不存在時纔將數據存入 map
中, 這個方法和 put
同樣是線程安全的, 當多個線程訪問 map
時不要作同步操做。
String value = map.putIfAbsent("c3", "p1"); System.out.println(value); // p0
getOrDefault()
方法返回給定 key
的 value
, 當 key
不存在時返回給定的值。
String value = map.getOrDefault("hi", "there"); System.out.println(value); // there
replaceAll()
方法接受一個 BiFunction
類型的 lambda
表達式, 並將 key
和 value
做爲參數傳遞,用來更新 value
。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3
compute()
方法和 replaceAll()
方法有些相同, 不一樣的是它多一個參數, 用來更新指定 key
的 value
map.compute("foo", (key, value) -> value + value); System.out.println(map.get("foo")); // barbar
以上全部方法都是 ConcurrentMap
接口的一部分,所以可用於該接口的全部實現。 此外,最重要的實現 ConcurrentHashMap
已經進一步加強了一些新的方法來在 Map
上執行併發操做。
就像並行流同樣,這些方法在 Java 8
中經過 ForkJoinPool.commonPool()
提供特殊的 ForkJoinPool
。該池使用預設的並行性, 這取決於可用內核的數量。 個人機器上有四個CPU內核能夠實現三種並行性:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
經過設置如下 JVM
參數能夠減小或增長此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
咱們使用相同的示例來演示, 不過下面使用 ConcurrentHashMap
類型, 這樣能夠調用更多的方法。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
Java 8
引入了三種並行操做:forEach
, search
和 reduce
。 每一個操做都有四種形式, 分別用 key
, value
, entries
和 key-value
來做爲參數。
全部這些方法的第一個參數都是 parallelismThreshold
閥值。 該閾值表示操做並行執行時的最小收集大小。 例如, 若是傳遞的閾值爲500
,而且 map
的實際大小爲499
, 則操做將在單個線程上按順序執行。 在下面的例子中,咱們使用一個閾值來強制並行操做。
方法 forEach()
可以並行地迭代 map
的鍵值對。 BiConsumer
類型的 lambda
表達式接受當前迭代的 key
和 value
。 爲了可視化並行執行,咱們將當前線程名稱打印到控制檯。 請記住,在個人狀況下,底層的 ForkJoinPool
最多使用三個線程。
map.forEach(1, (key, value) -> System.out.printf("key: %s; value: %s; thread: %s\n", key, value, Thread.currentThread().getName())); // key: r2; value: d2; thread: main // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1 // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2 // key: c3; value: p0; thread: main
search()
方法接受一個 BiFunction
類型的 lambda
表達式, 它能對 map
作搜索操做, 若是當前迭代不符合所需的搜索條件,則返回 null
。 請記住,ConcurrentHashMap
是無序的。 搜索功能不該該取決於地圖的實際處理順序。 若是有多個匹配結果, 則結果多是不肯定的。
String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar
下面是對 value 的搜索
String result = map.searchValues(1, value -> { System.out.println(Thread.currentThread().getName()); if (value.length() > 3) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // main // ForkJoinPool.commonPool-worker-1 // Result: solo
reduce()
方法接受兩個類型爲 BiFunction
的 lambda
表達式。 第一個函數將每一個鍵值對轉換爲任何類型的單個值。 第二個函數將全部這些轉換後的值組合成一個結果, 其中火忽略 null
值。
String result = map.reduce(1, (key, value) -> { System.out.println("Transform: " + Thread.currentThread().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println("Reduce: " + Thread.currentThread().getName()); return s1 + ", " + s2; }); System.out.println("Result: " + result); // Transform: ForkJoinPool.commonPool-worker-2 // Transform: main // Transform: ForkJoinPool.commonPool-worker-3 // Reduce: ForkJoinPool.commonPool-worker-3 // Transform: main // Reduce: main // Reduce: main // Result: r2=d2, c3=p0, han=solo, foo=bar