Java 8 併發: 原子變量和 ConcurrentMap

原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap

AtomicInteger

java.concurrent.atomic 包下有不少原子操做的類。 在有些狀況下,原子操做能夠在不使用 synchronized 關鍵字和鎖的狀況下解決多線程安全問題。html

在內部,原子類大量使用 CAS, 這是大多數如今 CPU 支持的原子操做指令, 這些指令一般狀況下比鎖同步要快得多。若是須要同時改變一個變量, 使用原子類是極其優雅的。java

如今選擇一個原子類 AtomicInteger 做爲例子web

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get());    // => 1000

使用 AtomicInteger 代替 Integer 能夠在線程安全的環境中增長變量, 而不要同步訪問變量。incrementAndGet() 方法是一個原子操做, 咱們能夠在多線程中安全的調用。編程

AtomicInteger 支持多種的原子操做, updateAndGet() 方法接受一個 lambda 表達式,以便對整數作任何的算術運算。api

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n + 2);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 2000

accumulateAndGet() 方法接受一個 IntBinaryOperator 類型的另外一種 lambda 表達式, 咱們是用這種方法來計算 1 -- 999 的和:安全

AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.accumulateAndGet(i, (n, m) -> n + m);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 499500

還有一些其餘的原子操做類: AtomicBoolean AtomicLong AtomicReference服務器

LongAdder

做爲 AtomicLong 的替代, LongAdder 類能夠用來連續地向數字添加值。多線程

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset());   // => 1000

LongAdder 類和其餘的整數原子操做類同樣提供了 add()increment() 方法, 同時也是線程安全的。但其內部的結果不是一個單一的值, 這個類的內部維護了一組變量來減小多線程的爭用。實際結果能夠經過調用 sum()sumThenReset() 來獲取。併發

當來自多線程的更新比讀取更頻繁時, 這個類每每優於其餘的原子類。一般做爲統計數據, 好比要統計 web 服務器的請求數量。 LongAdder 的缺點是會消耗更多的內存, 由於有一組變量保存在內存中。oracle

LongAccumulator

LongAccumulatorLongAdder 的一個更通用的版本。它不是執行簡單的添加操做, 類 LongAccumulator 圍繞 LongBinaryOperator 類型的lambda表達式構建,如代碼示例中所示:

LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10)
    .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset());     // => 2539

咱們使用函數 2 * x + y 和初始值1建立一個 LongAccumulator。 每次調用 accumulate(i) , 當前結果和值i都做爲參數傳遞給`lambda 表達式。

LongAdder 同樣, LongAccumulator 在內部維護一組變量以減小對線程的爭用。

ConcurrentMap

ConcurrentMap 接口擴展了 Map 接口,並定義了最有用的併發集合類型之一。 Java 8 經過向此接口添加新方法引入了函數式編程。

在下面的代碼片斷中, 來演示這些新的方法:

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

forEach() 接受一個類型爲 BiConsumerlambda 表達式, 並將 mapkeyvalue 做爲參數傳遞。

map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));

putIfAbsent() 方法只有當給定的 key 不存在時纔將數據存入 map 中, 這個方法和 put 同樣是線程安全的, 當多個線程訪問 map 時不要作同步操做。

String value = map.putIfAbsent("c3", "p1");
System.out.println(value);    // p0

getOrDefault() 方法返回給定 keyvalue, 當 key 不存在時返回給定的值。

String value = map.getOrDefault("hi", "there");
System.out.println(value);    // there

replaceAll() 方法接受一個 BiFunction 類型的 lambda 表達式, 並將 keyvalue 做爲參數傳遞,用來更新 value

map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2"));    // d3

compute() 方法和 replaceAll() 方法有些相同, 不一樣的是它多一個參數, 用來更新指定 keyvalue

map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo"));   // barbar

ConcurrentHashMap

以上全部方法都是 ConcurrentMap 接口的一部分,所以可用於該接口的全部實現。 此外,最重要的實現 ConcurrentHashMap 已經進一步加強了一些新的方法來在 Map 上執行併發操做。

就像並行流同樣,這些方法在 Java 8 中經過 ForkJoinPool.commonPool()提供特殊的 ForkJoinPool 。該池使用預設的並行性, 這取決於可用內核的數量。 個人機器上有四個CPU內核能夠實現三種並行性:

System.out.println(ForkJoinPool.getCommonPoolParallelism());  // 3

經過設置如下 JVM 參數能夠減小或增長此值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

咱們使用相同的示例來演示, 不過下面使用 ConcurrentHashMap 類型, 這樣能夠調用更多的方法。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");

Java 8 引入了三種並行操做:forEach, searchreduce。 每一個操做都有四種形式, 分別用 key, value, entrieskey-value 來做爲參數。

全部這些方法的第一個參數都是 parallelismThreshold 閥值。 該閾值表示操做並行執行時的最小收集大小。 例如, 若是傳遞的閾值爲500,而且 map 的實際大小爲499, 則操做將在單個線程上按順序執行。 在下面的例子中,咱們使用一個閾值來強制並行操做。

ForEach

方法 forEach() 可以並行地迭代 map 的鍵值對。 BiConsumer 類型的 lambda 表達式接受當前迭代的 keyvalue。 爲了可視化並行執行,咱們將當前線程名稱打印到控制檯。 請記住,在個人狀況下,底層的 ForkJoinPool 最多使用三個線程。

map.forEach(1, (key, value) ->
    System.out.printf("key: %s; value: %s; thread: %s\n",
        key, value, Thread.currentThread().getName()));

// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main

Search

search() 方法接受一個 BiFunction 類型的 lambda 表達式, 它能對 map 作搜索操做, 若是當前迭代不符合所需的搜索條件,則返回 null。 請記住,ConcurrentHashMap 是無序的。 搜索功能不該該取決於地圖的實際處理順序。 若是有多個匹配結果, 則結果多是不肯定的。

String result = map.search(1, (key, value) -> {
    System.out.println(Thread.currentThread().getName());
    if ("foo".equals(key)) {
        return value;
    }
    return null;
});
System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar

下面是對 value 的搜索

String result = map.searchValues(1, value -> {
    System.out.println(Thread.currentThread().getName());
    if (value.length() > 3) {
        return value;
    }
    return null;
});

System.out.println("Result: " + result);

// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo

Reduce

reduce() 方法接受兩個類型爲 BiFunctionlambda 表達式。 第一個函數將每一個鍵值對轉換爲任何類型的單個值。 第二個函數將全部這些轉換後的值組合成一個結果, 其中火忽略 null 值。

String result = map.reduce(1,
    (key, value) -> {
        System.out.println("Transform: " + Thread.currentThread().getName());
        return key + "=" + value;
    },
    (s1, s2) -> {
        System.out.println("Reduce: " + Thread.currentThread().getName());
        return s1 + ", " + s2;
    });

System.out.println("Result: " + result);

// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar
相關文章
相關標籤/搜索