【Java併發編程】常見工具類總結:CountDownLatch,CyclicBarrier,Semphore,Exchanger

CountDownLatch

CountDownLatch容許一個或多個線程等待其餘線程完成操做。相似於join的操做,能夠進行類比:java

join用於讓當前執行線程等待join線程執行結束,如A.join()方法,將不停檢查A線程是否存活,若是A存活,則當前線程永遠等待。編程

public class JoinCDLT {

    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                System.out.println("parser1 start");
                Thread.sleep(5000);
                System.out.println("parser1 finish");
            }
        });
        Thread parser2 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                System.out.println("parser2 start");
                Thread.sleep(10000);
                System.out.println("parser2 finish");
            }
        });
        long start = System.currentTimeMillis();
        parser1.start();
        parser2.start();
        //join用於讓當前執行線程等待join線程執行結束
        parser1.join();
        parser2.join();
        long end = System.currentTimeMillis();
        System.out.println("all parser finish , spend " + (end - start) + " ms");
    }
    
}

CountDownLatch使用AQS實現,經過AQS的狀態變量state來做爲計數器值,當多個線程調用countdown方法時實際是原子性遞減AQS的狀態值,當線程調用await方法後當前線程會被放入AQS阻塞隊列等待計數器爲0再返回。segmentfault

public class CountDownLatchTest {

    //傳入int參數做爲計數器
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        Thread a = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(3000);
                System.out.println(1);
                c.countDown();//每當調用該方法,計數器N - 1
            }
        });
        Thread b = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(3000);
                System.out.println(2);
                c.countDown();
            }
        });
        long start = System.currentTimeMillis();
        a.start();
        b.start();
        //await方法會阻塞當前線程
        c.await();
        long end = System.currentTimeMillis();
        System.out.println(3 + " " + (end - start));
    }
}

計數器必須大於等於0,只是等於0時候,計數器就是0,調用await方法時不會阻塞當前線程。CountDownLatch不可能從新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happen-before,另一個線程調用await方法。併發

CyclicBarrier

CyclicBarrier能夠讓一組線程達到一個屏障【同步點】時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。app

CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。ide

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //線程調用await,告訴CyclicBarrier已經到達了屏障,而後當前線程被阻塞
                    c.await(); 
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }, "thread-1");
        thread.start();
        try {
            //主線程到達了屏障,由於設置了parties設置爲2,所以能夠繼續下去
            c.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

上述程序,輸出的結果多是先1後2的次序,也多是先2後1,緣由在於:主線程和子線程的調度由CPU決定,兩個線程均可能先執行。函數

可是,若是將屏障數量改成3,此時主線程和子線程會永遠等待,由於沒有第三個線程達到屏障了。ui

CyclicBarrier和CountDownLatch的區別

  • CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法重置,所以CyclicBarrier可以處理更爲複雜的業務場景。
  • CyclicBarrier還提供了其餘有用的方法,如getNumberWaiting方法能夠得到CyclicBarrier阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。

Phaser 的實現

Phaser能夠替代CountDownLatch 和CyclicBarrier,但比二者更增強大,能夠動態調整須要的線程個數,能夠經過構造函數傳入父Phaser實現層次Phaser。線程

參考:http://www.javashuo.com/article/p-gholfgmo-de.htmlcode

Semaphore

Semaphore 能夠用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源,適用場景能夠是流量控制。

使用AQS實現,AQS的狀態變量state作爲許可證數量,每次經過acquire()/tryAcquire(),許可證數量經過CAS原子性遞減,調用release()釋放許可證,原子性遞增,只要有許可證就能夠重複使用。

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);//10個許可證數量,最大併發數爲10

    public static void main(String[] args) {
        for(int i = 0; i < THREAD_COUNT; i ++){ //執行30個線程
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    s.tryAcquire(); //嘗試獲取一個許可證
                    System.out.println("save data");
                    s.release(); //使用完以後歸還許可證
                }
            });
        }
        threadPool.shutdown();
    }
}

Exchanger 原理

用於進行線程間的數據交換,它提供一個同步點,在這個同步點兩個線程能夠交換彼此的數據。

若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當都達到同步點時,這兩個線程能夠交換數據。

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String A = "銀行流水A";
                exgr.exchange(A);
            }
        });
        threadPool.execute(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String B = "銀行流水B";
                String A = exgr.exchange(B);
                System.out.println("A 和 B 的數據是否一致 :"
                        + A.equals(B) + " A錄入的數據爲 :" + A + " B錄入的數據爲 :" + B);
            }
        });
        threadPool.shutdown();
    }
}

參考

相關文章
相關標籤/搜索