Java併發7:併發工具類

CountDownLatch(閉鎖)

閉鎖容許一個或者多個線程等待其餘線程都完成了才繼續執行。CountDownLatch 是一種閉鎖的實現,使得一個或多個線程等待一組事情發生。經過計數器表示須要等待的事件數量;使用countDown()方法將計數器減去1,表示有一個事件發生;使用await()方法阻塞當前線程,等待計數器爲0,也就是全部須要等待的事情發生。java

CountDownLatch 不能從新初始化或者修改內部計數器的值。編程

CountDownLatch 內部依賴內部類 Sync 實現,而 Sync 類繼承於 AQS。其內部經過共享鎖實現。bash

示例:併發

public class CountDownLatchTest {
    private static CountDownLatch countDownLatch = new CountDownLatch(5);

    /** * Boss線程,等待員工到達開會 */
    static class BossThread extends Thread{
        @Override
        public void run() {
            System.out.println("Boss在會議室等待,總共有" + countDownLatch.getCount() + "我的開會...");
            try {
                //Boss等待
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("全部人都已經到齊了,開會吧...");
        }
    }

    //員工到達會議室
    static class EmpleoyeeThread extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ",到達會議室....");
            //員工到達會議室 count - 1
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args){
        //Boss線程啓動
        new BossThread().start();

        for(int i = 0 ; i < countDownLatch.getCount() ; i++){
            new EmpleoyeeThread().start();
        }
    }
}

複製代碼

運行結果:dom

Boss在會議室等待,總共有5我的開會...
Thread-2,到達會議室....
Thread-3,到達會議室....
Thread-1,到達會議室....
Thread-5,到達會議室....
Thread-4,到達會議室....
全部人都已經到齊了,開會吧...
複製代碼

CycliBarrier(屏障)

閉鎖是一次性對象,一旦進入終止狀態,就不能被重置。屏障相似於閉鎖,阻塞一組進程直到某個事件發生。也就是讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。ide

CycliBarrier 內部使用了可重用鎖 ReentrantLock 和 Condition。線程執行await()方法後,計數器減1,進行等待,直到計數器爲0,全部調用了await()方法的線程繼續執行。工具

CycliBarrier適用於多個線程結果合併的場景。ui

示例:this

public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier;

    static class CyclicBarrierThread extends Thread{
        public void run() {
            System.out.println(Thread.currentThread().getName() + "到了");
            //等待
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("人到齊了,開會吧....");
            }
        });

        for(int i = 0 ; i < 5 ; i++){
            new CyclicBarrierThread().start();
        }
    }
}
複製代碼

運行結果:spa

Thread-0到了
Thread-1到了
Thread-4到了
Thread-2到了
Thread-3到了
人到齊了,開會吧...
複製代碼

閉鎖和屏障的比較

CountDownLatch 的計數器只能使用一次,CyclicBarrier 的計數器可使用 reset() 方法重置。

閉鎖只會阻塞一條線程,目的是爲了讓該條任務線程知足條件後執行; 而同步屏障會阻塞全部線程,目的是爲了讓全部線程同時執行。

Semaphore(信號量)

信號量用於控制同時訪問某個特定資源的操做數量,或者執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界。

信號量能夠用於實現資源池,也能夠用於將容器變爲有界阻塞容器。信號量管理着一組虛擬的許可,在執行操做時首先獲取許可,並在使用之後釋放許可。若是沒有許可,將阻塞直到有許可或被中斷,超時。

信號量的使用場景是,有m個資源,n個線程,且n>m,同一時刻只能容許m條線程訪問資源。

信號量內部一樣依賴於繼承自 AQS 的 Sync 類,且包含兩個子類,分別是 FairSync 和 NonfairSync。

信號量經過acquire()方法獲取許可;經過release()方法釋放許可。

示例:

public class SemaphoreCase {
    private static class Parking{
        private Semaphore semaphore;

        Parking(int count){
            semaphore=new Semaphore(count);
        }

        public void park(){
            try {
                semaphore.acquire();
                long time=(long) (Math.random()*10);
                System.out.println(Thread.currentThread().getName()+"進入停車場停車"+time+"秒");
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName()+"開出停車場");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }

    private static class Car extends Thread{
        Parking parking;
        Car(Parking parking){
            this.parking=parking;
        }

        @Override
        public void run() {
            parking.park();
        }
    }

    public static void main(String[] args) {
        Parking parking=new Parking(3);
        for(int i=0;i<5;i++){
            new Car(parking).start();
        }
    }
}

複製代碼

運行結果爲:

Thread-2進入停車場停車7秒
Thread-1進入停車場停車9秒
Thread-0進入停車場停車4秒
Thread-0開出停車場
Thread-3進入停車場停車4秒
Thread-2開出停車場
Thread-4進入停車場停車3秒
Thread-1開出停車場
Thread-3開出停車場
Thread-4開出停車場
複製代碼

Exchanger(交換者)

Exchanger 是一個用於線程間協做的工具類,用於線程間的數據交換。它提供了一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。兩個線程經過exchange()方法交換數據,一個線程執行了該方法,會一直等待另外一個線程執行該方法,當兩個線程都到達同步點,這兩個線程就能夠交換數據。

示例:

public class ExchangerCase {
    private static class Producer implements Runnable{
        private List<String> buffer;
        private Exchanger<List<String>> exchanger;
        Producer(List<String> buffer,Exchanger<List<String>> exchanger){
            this.buffer=buffer;
            this.exchanger=exchanger;
        }
        @Override
        public void run() {
            for(int i=1;i<5;i++){
                System.out.println("生產者生產次數:"+i);
                for(int j=1;j<=3;j++){
                    System.out.println("生產者裝入"+i+"--"+j);
                    buffer.add("buffer: "+i+"--"+j);
                }
                System.out.println("生產者裝滿,等待消費者交換");
                try {
                    exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private static class Consumer implements Runnable{
        private List<String> buffer;
        private Exchanger<List<String>> exchanger;

        Consumer(List<String> buffer,Exchanger<List<String>> exchanger){
            this.buffer=buffer;
            this.exchanger=exchanger;
        }

        @Override
        public void run() {
            for(int i=1;i<5;i++){
                try {
                    buffer=exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消費者消費次數:"+i);
                for(int j=1;j<=3;j++) {
                    System.out.println("消費者消費" +buffer.get(0));
                    buffer.remove(0);
                }
            }
        }
    }

    public static void main(String[] args) {
        List<String> buffer1 = new ArrayList<String>();
        List<String> buffer2 = new ArrayList<String>();

        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

        Thread producerThread = new Thread(new Producer(buffer1,exchanger));
        Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));

        producerThread.start();
        consumerThread.start();
    }
}

複製代碼

運行結果

生產者生產次數:1
生產者裝入1--1
生產者裝入1--2
生產者裝入1--3
生產者裝滿,等待消費者交換
生產者生產次數:2
生產者裝入2--1
生產者裝入2--2
生產者裝入2--3
生產者裝滿,等待消費者交換
消費者消費次數:1
消費者消費buffer: 1--1
消費者消費buffer: 1--2
消費者消費buffer: 1--3
消費者消費次數:2
消費者消費buffer: 2--1
消費者消費buffer: 2--2
生產者生產次數:3
生產者裝入3--1
消費者消費buffer: 2--3
生產者裝入3--2
生產者裝入3--3
生產者裝滿,等待消費者交換
生產者生產次數:4
生產者裝入4--1
生產者裝入4--2
消費者消費次數:3
消費者消費buffer: 3--1
生產者裝入4--3
生產者裝滿,等待消費者交換
消費者消費buffer: 3--2
消費者消費buffer: 3--3
消費者消費次數:4
消費者消費buffer: 4--1
消費者消費buffer: 4--2
消費者消費buffer: 4--3
複製代碼

在Exchanger中,若是一個線程已經到達了exchanger節點時,對於它的夥伴節點的狀況有三種:

  • 若是它的夥伴節點在該線程到達以前已經調用了exchanger方法,則它會喚醒它的夥伴而後進行數據交換,獲得各自數據返回。
  • 若是它的夥伴節點尚未到達交換點,則該線程將會被掛起,等待它的夥伴節點到達被喚醒,完成數據交換。
  • 若是當前線程被中斷了則拋出異常,或者等待超時了,則拋出超時異常。

參考資料

相關文章
相關標籤/搜索