和朱曄一塊兒複習Java併發(五):併發容器和同步器

本節咱們先會來複習一下java.util.concurrent下面的一些併發容器,而後再會來簡單看一下各類同步器。java

ConcurrentHashMap和ConcurrentSkipListMap的性能

首先,咱們來測試一下ConcurrentHashMap和ConcurrentSkipListMap的性能。 前者對應的非併發版本是HashMap,後者是跳錶實現,Map按照Key順序排序(固然也能夠提供一個Comparator進行排序)。git

在這個例子裏,咱們不是簡單的測試Map讀寫Key的性能,而是實現一個多線程環境下使用Map最最多見的場景:統計Key出現頻次,咱們的Key的範圍是1萬個,而後循環1億次(也就是Value平均也在1萬左右),10個併發來操做Map:github

@Slf4j
public class ConcurrentMapTest {

    int loopCount = 100000000;
    int threadCount = 10;
    int itemCount = 10000;

    @Test
    public void test() throws InterruptedException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("hashmap");
        normal();
        stopWatch.stop();
        stopWatch.start("concurrentHashMap");
        concurrent();
        stopWatch.stop();
        stopWatch.start("concurrentSkipListMap");
        concurrentSkipListMap();
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }

    private void normal() throws InterruptedException {
        HashMap<String, Long> freqs = new HashMap<>();
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
                    String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
                    synchronized (freqs) {
                        if (freqs.containsKey(key)) {
                            freqs.put(key, freqs.get(key) + 1);
                        } else {
                            freqs.put(key, 1L);
                        }
                    }
                }
        ));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        //log.debug("normal:{}", freqs);

    }

    private void concurrent() throws InterruptedException {
        ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(itemCount);
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
                    String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
                    freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
                }
        ));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        //log.debug("concurrentHashMap:{}", freqs);
    }

    private void concurrentSkipListMap() throws InterruptedException {
        ConcurrentSkipListMap<String, LongAdder> freqs = new ConcurrentSkipListMap<>();
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> {
                    String key = "item" + ThreadLocalRandom.current().nextInt(itemCount);
                    freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
                }
        ));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        //log.debug("concurrentSkipListMap:{}", freqs);
    }
}
複製代碼

這裏能夠看到,這裏的三種實現:安全

  • 對於normal的實現,咱們全程鎖住了HashMap而後進行讀寫
  • 對於ConcurrentHashMap,咱們巧妙利用了一個computeIfAbsent()方法,實現了判斷Key是否存在,計算獲取Value,put Key Value三步操做,獲得一個Value是LongAdder(),而後由於LongAdder是線程安全的因此直接調用了increase()方法,一行代碼實現了5行代碼效果
  • ConcurrentSkipListMap也是同樣

運行結果以下: bash

image_1dg7ckmsp1k4cde81agdpc1fit9.png-64.2kB

能夠看到咱們利用ConcurrentHashMap巧妙實現的併發詞頻統計功能,其性能相比有鎖的版本高了太多。 值得注意的是,ConcurrentSkipListMap的containsKey、get、put、remove等相似操做時間複雜度是log(n),加上其有序性,因此性能和ConcurrentHashMap有差距。微信

若是咱們打印一下ConcurrentSkipListMap最後的結果,差很少是這樣的: 網絡

image_1dg7dcv63506qsiahte1s17b7m.png-353.9kB
能夠看到Entry按照了Key進行排序。

ConcurrentHashMap的那些原子操做方法

這一節咱們比較一下computeIfAbsent()和putIfAbsent()的區別,這2個方法很容易由於誤用致使一些Bug。多線程

  • 第一個是性能上的區別,若是Key存在的話,computeIfAbsent由於傳入的是一個函數,函數壓根就不會執行,而putIfAbsent須要直接傳值。因此若是要得到Value代價很大的話,computeIfAbsent性能會好
  • 第二個是使用上的區別,computeIfAbsent返回是的是操做後的值,若是以前值不存在的話就返回計算後的值,若是原本就存在那麼就返回原本存在的值,putIfAbsent返回的是以前的值,若是原來值不存在那麼會獲得null

寫一個程序來驗證一下:併發

@Slf4j
public class PutIfAbsentTest {

    @Test
    public void test() {
        ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();
        log.info("Start");
        log.info("putIfAbsent:{}", concurrentHashMap.putIfAbsent("test1", getValue()));
        log.info("computeIfAbsent:{}", concurrentHashMap.computeIfAbsent("test1", k -> getValue()));
        log.info("putIfAbsent again:{}", concurrentHashMap.putIfAbsent("test2", getValue()));
        log.info("computeIfAbsent again:{}", concurrentHashMap.computeIfAbsent("test2", k -> getValue()));
    }

    private String getValue() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return UUID.randomUUID().toString();
    }
}
複製代碼

在這裏獲取值的操做須要1s,從運行結果能夠看到,第二次值已經存在的時候,putIfAbsent還耗時1s,而computeIfAbsent不是,並且還能夠看到第一次值不存在的時候putIfAbsent返回了null,而computeIfAbsent返回了計算後的值:框架

image_1dg7e2vdb1iin1vp6113c1d4q1mkp13.png-203.7kB

使用的時候必定須要根據本身的需求來使用合適的方法。

ThreadLocalRandom的誤用

以前的例子裏咱們用到了ThreadLocalRandom,這裏簡單提一下ThreadLocalRandom可能的誤用:

@Slf4j
public class ThreadLocalRandomMisuse {
    @Test
    public void test() throws InterruptedException {
        ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
        IntStream.rangeClosed(1, 5)
                .mapToObj(i -> new Thread(() -> log.info("wrong:{}", threadLocalRandom.nextInt())))
                .forEach(Thread::start);
        IntStream.rangeClosed(1, 5)
                .mapToObj(i -> new Thread(() -> log.info("ok:{}", ThreadLocalRandom.current().nextInt())))
                .forEach(Thread::start);
        TimeUnit.SECONDS.sleep(1);
    }
}
複製代碼

一句話而言,咱們應該每次都ThreadLocalRandom.current().nextInt()這樣用而不是實例化了ThreadLocalRandom.current()每次調用nextInt()。觀察一下兩次輸出能夠發現,wrong的那5次獲得的隨機數都是同樣的:

image_1dg7eb7ltbg2156p17ija8b1k281g.png-338kB

ConcurrentHashMap的併發reduce功能測試

ConcurrentHashMap提供了比較高級的一些方法能夠進行併發的歸併操做,咱們寫一段程序比較一下使用遍歷方式以及使用reduceEntriesToLong()統計ConcurrentHashMap中全部值的平均數的性能和寫法上的差別:

@Slf4j
public class ConcurrentHashMapReduceTest {

    int loopCount = 100;
    int itemCount = 10000000;

    @Test
    public void test() {
        ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, itemCount)
                .boxed()
                .collect(Collectors.toMap(i -> "item" + i, Function.identity(),(o1, o2) -> o1, ConcurrentHashMap::new));
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("normal");
        normal(concurrentHashMap);
        stopWatch.stop();
        stopWatch.start("concurrent with parallelismThreshold=1");
        concurrent(concurrentHashMap, 1);
        stopWatch.stop();
        stopWatch.start("concurrent with parallelismThreshold=max long");
        concurrent(concurrentHashMap, Long.MAX_VALUE);
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }

    private void normal(ConcurrentHashMap<String, Long> map) {
        IntStream.rangeClosed(1, loopCount).forEach(__ -> {
            long sum = 0L;
            for (Map.Entry<String, Long> item : map.entrySet()) {
                sum += item.getValue();
            }
            double average = sum / map.size();
            Assert.assertEquals(itemCount / 2, average, 0);
        });
    }

    private void concurrent(ConcurrentHashMap<String, Long> map, long parallelismThreshold) {
        IntStream.rangeClosed(1, loopCount).forEach(__ -> {
            double average = map.reduceEntriesToLong(parallelismThreshold, Map.Entry::getValue, 0, Long::sum) / map.size();
            Assert.assertEquals(itemCount / 2, average, 0);
        });
    }
}
複製代碼

執行結果以下:

image_1dg7etsj31t9c1cg5pg71sfr1vio1t.png-86.2kB
能夠看到並行歸併操做對於比較大的HashMap性能好很多,注意一點是傳入的parallelismThreshold不是並行度(不是ForkJoinPool(int parallelism)的那個parallelism)的意思,而是並行元素的閾值,傳入Long.MAX_VALUE取消並行,傳入1充分利用ForkJoinPool。

固然,咱們這裏只演示了reduceEntriesToLong()一個方法,ConcurrentHashMap還有十幾種各類reduceXXX()用於對Key、Value和Entry進行並行歸併操做。

ConcurrentHashMap的誤用

其實這裏想說的以前的文章中也提到過,ConcurrentHashMap不能確保多個針對Map的操做是原子性的(除非是以前提到computeIfAbsent()和putIfAbsent()等等),好比在下面的例子裏,咱們有一個9990大小的ConcurrentHashMap,有多個線程在計算它離10000滿員還有多少差距,而後填充差距:

@Test
public void test() throws InterruptedException {
    int limit = 10000;
    ConcurrentHashMap<String, Long> concurrentHashMap = LongStream.rangeClosed(1, limit - 10)
            .boxed()
            .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
                    (o1, o2) -> o1, ConcurrentHashMap::new));
    log.info("init size:{}", concurrentHashMap.size());

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for (int __ = 0; __ < 10; __++) {
        executorService.execute(() -> {
            int gap = limit - concurrentHashMap.size();
            log.debug("gap:{}", gap);
            concurrentHashMap.putAll(LongStream.rangeClosed(1, gap)
                    .boxed()
                    .collect(Collectors.toMap(i -> UUID.randomUUID().toString(), Function.identity())));
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.HOURS);

    log.info("finish size:{}", concurrentHashMap.size());
}
複製代碼

這段代碼顯然是有問題的:

  • 第一,諸如size()、containsValue()等(聚合狀態的)方法僅僅在沒有併發更新的時候是準確的,不然只能做爲統計、監控來使用,不能用於控制程序運行邏輯
  • 第二,即便size()是準確的,在計算出gap以後其它線程可能已經往裏面添加數據了,雖然putAll()操做這一操做是線程安全的,可是這個這個計算gap,填補gap的邏輯並非原子性的,不是說用了ConcurrentHashMap就不須要鎖了

輸出結果以下:

image_1dg7frtvg1qmgdso1cv9men15f12a.png-351.4kB

能夠看到,有一些線程甚至計算出了負數的gap,最後結果是10040,比預期的limit多了40。

還有一點算不上誤用,只是提一下,ConcurrentHashMap的Key/Value不能是null,而HashMap是能夠的,爲何是這樣呢? 下圖是ConcurrentHashMap做者的回覆:

image_1dg7ghtj114u219b81se319to1qg02n.png-282.5kB

意思就是若是get(key)返回了null,你搞不清楚這究竟是key沒有呢仍是value就是null。非併發狀況下你可使用後contains(key)來判斷,可是併發狀況下不行,你判斷的時候可能Map已經修改了。

CopyOnWriteArrayList測試

CopyOnWrite的意義在於幾乎沒有什麼修改,而讀併發超級高的場景,若是有修改,咱們重起爐竈複製一份,雖然代價很大,可是這樣能讓99.9%的併發讀實現無鎖,咱們來試試其性能,先是寫的測試,咱們比拼一下CopyOnWriteArrayList、手動鎖的ArrayList以及synchronizedList包裝過的ArrayList:

@Test
public void testWrite() {
    List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
    List<Integer> arrayList = new ArrayList<>();
    List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
    StopWatch stopWatch = new StopWatch();
    int loopCount = 100000;
    stopWatch.start("copyOnWriteArrayList");
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
    stopWatch.stop();
    stopWatch.start("arrayList");
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
        synchronized (arrayList) {
            arrayList.add(ThreadLocalRandom.current().nextInt(loopCount));
        }
    });
    stopWatch.stop();
    stopWatch.start("synchronizedList");
    IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
    stopWatch.stop();
    log.info(stopWatch.prettyPrint());
}
複製代碼

10萬次操做不算多,結果以下:

image_1dg7h4kskojhnuavv014o7104t34.png-73.1kB
可見CopyOnWriteArrayList的修改由於涉及到整個數據的複製,代價至關大。

再來看看讀,先使用一個方法來進行1000萬數據填充,而後測試,迭代1億次:

private void addAll(List<Integer> list) {
    list.addAll(IntStream.rangeClosed(1, 10000000).boxed().collect(Collectors.toList()));
}

@Test
public void testRead() {
    List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
    List<Integer> arrayList = new ArrayList<>();
    List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
    addAll(copyOnWriteArrayList);
    addAll(arrayList);
    addAll(synchronizedList);
    StopWatch stopWatch = new StopWatch();
    int loopCount = 100000000;
    int count = arrayList.size();
    stopWatch.start("copyOnWriteArrayList");
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
    stopWatch.stop();
    stopWatch.start("arrayList");
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> {
        synchronized (arrayList) {
            arrayList.get(ThreadLocalRandom.current().nextInt(count));
        }
    });
    stopWatch.stop();
    stopWatch.start("synchronizedList");
    IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
    stopWatch.stop();
    log.info(stopWatch.prettyPrint());
}
複製代碼

執行結果以下:

image_1dg7h9gou67s1ck71rae1goeatr3h.png-83.1kB
的確沒錯,CopyOnWriteArrayList性能至關強悍,畢竟讀取無鎖,想多少併發就多少併發。

看完了大部分的併發容器咱們再來看看五種併發同步器。

CountDownLatch測試

CountDownLatch在以前的文章中已經出現過N次了,也是五種併發同步器中使用最最頻繁的一種,通常常見的應用場景有:

  • 等待N個線程執行完畢
  • 就像以前不少次性能測試例子,使用兩個CountDownLatch,一個用來讓全部線程等待主線程發起命令一塊兒開啓,一個用來給主線程等待全部子線程執行完畢
  • 異步操做的異步轉同步,不少基於異步網絡通信(好比Netty)的RPC框架都使用了CountDownLatch來異步轉同步,好比下面取自RocketMQ中Remoting模塊的源碼片斷:

image_1dg7p5lfu1mh71rtuugl1er8nlk4b.png-281.7kB

來看看ResponseFuture的相關代碼實現:

public class ResponseFuture {
    private final int opaque;
    private final Channel processChannel;
    private final long timeoutMillis;
    private final InvokeCallback invokeCallback;
    private final long beginTimestamp = System.currentTimeMillis();
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    private final SemaphoreReleaseOnlyOnce once;
    private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
    private volatile RemotingCommand responseCommand;
    private volatile boolean sendRequestOK = true;
    private volatile Throwable cause;

...  
    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }
...
}
複製代碼

在發出網絡請求後,咱們等待響應,在收到響應後咱們把數據放入後解鎖CountDownLatch,而後等待響應的請求就能夠繼續拿數據。

Semaphore測試

Semaphore能夠用來限制併發,假設咱們有一個遊戲須要限制同時在線的玩家,咱們先來定義一個Player類,在這裏咱們經過傳入的Semaphore限制進入玩家的數量。 在代碼裏,咱們經過了以前學習到的AtomicInteger、AtomicLong和LongAdder來統計玩家的總數,最長等待時間和宗等待時長。

@Slf4j
public class Player implements Runnable {

    private static AtomicInteger totalPlayer = new AtomicInteger();
    private static AtomicLong longestWait = new AtomicLong();
    private static LongAdder totalWait = new LongAdder();
    private String playerName;
    private Semaphore semaphore;
    private LocalDateTime enterTime;

    public Player(String playerName, Semaphore semaphore) {
        this.playerName = playerName;
        this.semaphore = semaphore;
    }

    public static void result() {
        log.info("totalPlayer:{},longestWait:{}ms,averageWait:{}ms", totalPlayer.get(), longestWait.get(), totalWait.doubleValue() / totalPlayer.get());
    }

    @Override
    public void run() {
        try {
            enterTime = LocalDateTime.now();
            semaphore.acquire();
            totalPlayer.incrementAndGet();
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
            long ms = Duration.between(enterTime, LocalDateTime.now()).toMillis();
            longestWait.accumulateAndGet(ms, Math::max);
            totalWait.add(ms);
            //log.debug("Player:{} finished, took:{}ms", playerName, ms);
        }
    }
}
複製代碼

主測試代碼以下:

@Test
public void test() throws InterruptedException {
    Semaphore semaphore = new Semaphore(10, false);
    ExecutorService threadPool = Executors.newFixedThreadPool(100);
    IntStream.rangeClosed(1, 10000).forEach(i -> threadPool.execute(new Player("Player" + i, semaphore)));
    threadPool.shutdown();
    threadPool.awaitTermination(1, TimeUnit.HOURS);
    Player.result();
}
複製代碼

咱們限制併發玩家數量爲10個,非公平進入,線程池是100個固定線程,總共有10000個玩家須要進行遊戲,程序結束後輸出以下:

image_1dg7pm9mmt3112srvku1gibprt4o.png-62kB
再來試試公平模式:
image_1dg7ps2jaepfifbmtg16fl117c5i.png-61.6kB
能夠明顯看到,開啓公平模式後最長等待的那個玩家沒有等那麼久了,平均等待時間比以前略長,符合預期。

CyclicBarrier測試

CyclicBarrier用來讓全部線程彼此等待,等待全部的線程或者說參與方一塊兒到達了匯合點後一塊兒進入下一次等待,不斷循環。在全部線程到達了匯合點後能夠由最後一個到達的線程作一下『後處理』操做,這個後處理操做能夠在聲明CyclicBarrier的時候傳入,也能夠經過判斷await()的返回來實現。

這個例子咱們實現一個簡單的場景,一個演出須要等待3位演員到位才能開始表演,演出須要進行3次。咱們經過CyclicBarrier來實現等到全部演員到位,到位後咱們的演出須要2秒時間。

@Slf4j
public class CyclicBarrierTest {
    @Test
    public void test() throws InterruptedException {

        int playerCount = 5;
        int playCount = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(playerCount);
        List<Thread> threads = IntStream.rangeClosed(1, playerCount).mapToObj(player->new Thread(()-> IntStream.rangeClosed(1, playCount).forEach(play->{
            try {
                TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100));
                log.debug("Player {} arrived for play {}", player, play);
                if (cyclicBarrier.await() ==0) {
                    log.info("Total players {} arrived, let's play {}", cyclicBarrier.getParties(),play);
                    TimeUnit.SECONDS.sleep(2);
                    log.info("Play {} finished",play);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }))).collect(Collectors.toList());

        threads.forEach(Thread::start);
        for (Thread thread : threads) {
            thread.join();
        }
    }
}
複製代碼

經過if (cyclicBarrier.await() ==0)能夠實如今最後一個演員到位後作衝破柵欄後的後處理操做,咱們看下這個演出是否是循環了3次,而且是否是全部演員到位後纔開始的:

10:35:43.333 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 1
10:35:43.333 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 1
10:35:43.333 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 1
10:35:43.367 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 1
10:35:43.376 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 1
10:35:43.377 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 1 10:35:43.378 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 2 10:35:43.432 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 2 10:35:43.434 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 2 10:35:43.473 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 2 10:35:45.382 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 1 finished 10:35:45.390 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 2 10:35:45.390 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 2
10:35:45.437 [Thread-3] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 4 arrived for play 3
10:35:45.443 [Thread-4] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 5 arrived for play 3
10:35:45.445 [Thread-2] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 3 arrived for play 3
10:35:45.467 [Thread-1] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 2 arrived for play 3
10:35:47.395 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 2 finished
10:35:47.472 [Thread-0] DEBUG me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Player 1 arrived for play 3
10:35:47.473 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Total players 5 arrived, let's play 3 10:35:49.477 [Thread-0] INFO me.josephzhu.javaconcurrenttest.concurrent.synchronizers.CyclicBarrierTest - Play 3 finished 複製代碼

從這個例子能夠看到,咱們的演出是在最後到達的Player1演員這個線程上進行的,值得注意的一點是,在他表演的時候其餘演員已經又進入了等待狀態(不要誤認爲,CyclicBarrier會讓全部線程阻塞,等待後處理完成後再讓其它線程繼續下一次循環),就等他表演結束後繼續來到await()才能又開始新的演出。

Phaser測試

Phaser和Barrier相似,只不過前者更靈活,參與方的人數是能夠動態控制的,而不是一開始先肯定的。Phaser能夠手動經過register()方法註冊成爲一個參與方,而後經過arriveAndAwaitAdvance()表示本身已經到達,等到其它參與方一塊兒到達後衝破柵欄。

好比下面的代碼,咱們對全部傳入的任務進行iterations次迭代操做。 Phaser終止的條件是大於迭代次數或者沒有參與方,onAdvance()返回true表示終止。 咱們首先讓主線程成爲一個參與方,而後讓每個任務也成爲參與方,在新的線程中運行任務,運行完成後到達柵欄,只要柵欄沒有終止則無限循環。 在主線程上咱們一樣也是無限循環,每個階段都是等待其它線程完成任務後(到達柵欄後),本身再到達柵欄開啓下一次任務。

@Slf4j
public class PhaserTest {

    AtomicInteger atomicInteger = new AtomicInteger();

    @Test
    public void test() throws InterruptedException {
        int iterations = 10;
        int tasks = 100;
        runTasks(IntStream.rangeClosed(1, tasks)
                .mapToObj(index -> new Thread(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    atomicInteger.incrementAndGet();
                }))
                .collect(Collectors.toList()), iterations);
        Assert.assertEquals(tasks * iterations, atomicInteger.get());
    }

    private void runTasks(List<Runnable> tasks, int iterations) {
        Phaser phaser = new Phaser() {
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase >= iterations - 1 || registeredParties == 0;
            }
        };
        phaser.register();
        for (Runnable task : tasks) {
            phaser.register();
            new Thread(() -> {
                do {
                    task.run();
                    phaser.arriveAndAwaitAdvance();
                } while (!phaser.isTerminated());
            }).start();
        }
        while (!phaser.isTerminated()) {
            doPostOperation(phaser);
            phaser.arriveAndAwaitAdvance();
        }
        doPostOperation(phaser);
    }

    private void doPostOperation(Phaser phaser) {
        while (phaser.getArrivedParties() < 100) {
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.info("phase:{},registered:{},unarrived:{},arrived:{},result:{}",
                phaser.getPhase(),
                phaser.getRegisteredParties(),
                phaser.getUnarrivedParties(),
                phaser.getArrivedParties(), atomicInteger.get());
    }
}
複製代碼

10次迭代,每次迭代100個任務,執行一下看看:

image_1dgajh5dkphcg2o62q9301pvi9.png-416.5kB

能夠看到,主線程的後處理任務的while循環結束後只有它本身沒有到達柵欄,這個時候它能夠作一些任務後處理工做,完成後衝破柵欄。

Exchanger測試

Exchanger實現的效果是兩個線程在同一時間(會合點)交換數據,寫一段代碼測試一下。在下面的代碼裏,咱們定義一個生產者線程不斷髮送數據,發送數據後休眠時間隨機,經過使用Exchanger,消費者線程實現了在生產者發送數據後馬上拿到數據的效果,在這裏咱們並無使用阻塞隊列來實現:

@Slf4j
public class ExchangerTest {

    @Test
    public void test() throws InterruptedException {
        Random random = new Random();
        Exchanger<Integer> exchanger = new Exchanger<>();
        int count = 10;
        Executors.newFixedThreadPool(1, new ThreadFactoryImpl("producer"))
                .execute(() -> {
                    try {
                        for (int i = 0; i < count; i++) {
                            log.info("sent:{}", i);
                            exchanger.exchange(i);
                            TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });

        ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactoryImpl("consumer"));
        executorService.execute(() -> {
            try {
                for (int i = 0; i < count; i++) {
                    int data = exchanger.exchange(null);
                    log.info("got:{}", data);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.HOURS);
    }
}
複製代碼

運行效果以下:

image_1dg7q8eqiffnvca166kst8pk5v.png-501.7kB

小結

併發容器這塊我就不作過多總結了,ConcurrentHashMap實在是太好用太經常使用,可是務必注意其線程安全的特性並非說ConcurrentHashMap怎麼用都沒有問題,錯誤使用在業務代碼中很常見。

如今咱們來舉個看錶演的例子總結一下幾種併發同步器:

  • Semaphore是限制同時看錶演的觀衆人數,有人走了後新人才能進來看
  • CountDownLatch是演職人員人不到齊表演沒法開始,演完結束
  • CyclicBarrier是演職人員到期了後才能表演,最後一個到的人是導演,導演會主導整個演出,演出完畢後全部演職人員修整後從新等待你們到期
  • Phaser是每一場演出的演職人員名單可能隨時會更改,可是也是要確保全部演職人員到期後才能開演

一樣,代碼見個人Github,歡迎clone後本身把玩,歡迎點贊。

歡迎關注個人微信公衆號:隨緣主人的園子

image_1dfvp8d55spm14t7erkr3mdbscf.png-45kB
相關文章
相關標籤/搜索