Java多線程(十五):CountDownLatch,Semaphore,Exchanger,CyclicBarrier,Callable和Future

CountDownLatch

CountDownLatch用來使一個線程或多個線程等待到其餘線程完成。CountDownLatch有個初始值count,await方法會阻塞線程,直到經過countDown方法調用使count減小爲0纔會執行await方法後面的代碼。
示例代碼
MyThread50_0是WorkThread,不一樣的線程休眠時間不同。java

public class MyThread50_0 extends Thread
{
    private CountDownLatch cdl;
    private int sleepSecond;

    public MyThread50_0(String name, CountDownLatch cdl, int sleepSecond)
    {
        super(name);
        this.cdl = cdl;
        this.sleepSecond = sleepSecond;
    }

    public void run()
    {
        try
        {
            System.out.println(this.getName() + "啓動了,時間爲" + new Date());
            Thread.sleep(sleepSecond * 1000);
            cdl.countDown();
            System.out.println(this.getName() + "執行完了,時間爲" + new Date());
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

MyThread50_1是DoneThread和main方法併發

public class MyThread50_1 extends Thread {
    private CountDownLatch cdl;

    public MyThread50_1(String name, CountDownLatch cdl)
    {
        super(name);
        this.cdl = cdl;
    }

    public void run()
    {
        try
        {
            System.out.println(this.getName() + "要等待了, 時間爲" + new Date());
            cdl.await();
            System.out.println(this.getName() + "等待完了, 時間爲" + new Date());
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(3);
        MyThread50_1 dt0 = new MyThread50_1("DoneThread1", cdl);
        MyThread50_1 dt1 = new MyThread50_1("DoneThread2", cdl);
        dt0.start();
        dt1.start();
        MyThread50_0 wt0 = new MyThread50_0("WorkThread1", cdl, 2);
        MyThread50_0 wt1 = new MyThread50_0("WorkThread2", cdl, 3);
        MyThread50_0 wt2 = new MyThread50_0("WorkThread3", cdl, 4);
        wt0.start();
        wt1.start();
        wt2.start();
    }
}

運行結果以下異步

DoneThread2要等待了, 時間爲Sun Sep 22 21:37:57 CEST 2019
DoneThread1要等待了, 時間爲Sun Sep 22 21:37:57 CEST 2019
WorkThread3啓動了,時間爲Sun Sep 22 21:37:57 CEST 2019
WorkThread2啓動了,時間爲Sun Sep 22 21:37:57 CEST 2019
WorkThread1啓動了,時間爲Sun Sep 22 21:37:57 CEST 2019
WorkThread1執行完了,時間爲Sun Sep 22 21:37:59 CEST 2019
WorkThread2執行完了,時間爲Sun Sep 22 21:38:00 CEST 2019
WorkThread3執行完了,時間爲Sun Sep 22 21:38:01 CEST 2019
DoneThread2等待完了, 時間爲Sun Sep 22 21:38:01 CEST 2019
DoneThread1等待完了, 時間爲Sun Sep 22 21:38:01 CEST 2019

「DoneThreadX要等待了」和「WorkThreadX啓動了」的順序是隨機的。
「WorkThreadX執行完了「的順序按照一、二、3,由於咱們的等待時間二、三、4秒。
待WorkThread3執行完了,纔會執行await()以後的代碼,DoneThreadX執行完了,一樣該順序隨機。
這是一種增強版的等待/通知模型,它能夠實現多個工做線程完成任務後通知多個等待線程開始工做。
咱們以前的等待/通知模型只能實現一個工做線程完成任務後通知一個等待線程或者全部等待線程開始工做。函數

Semaphore

Semaphore用來控制併發數量,Semaphore構造函數傳入permit(許可),一個permit至關於一個不可重入鎖,acquire方法得到permit,relase方法歸還permit。
代碼示例以下ui

public class MyThread51 {
    public static void main(String[] args)
    {
        final Semaphore semaphore = new Semaphore(5);

        Runnable runnable = new Runnable()
        {
            public void run()
            {
                try
                {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "得到了permit,時間爲" + new Date());
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "釋放了permit,時間爲" + new Date());

                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    semaphore.release();
                }
            }
        };

        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++)
            threads[i] = new Thread(runnable);
        for (int i = 0; i < threads.length; i++)
            threads[i].start();
    }
}

運行結果以下this

Thread-2得到了permit,時間爲Sun Sep 29 21:47:05 CEST 2019
Thread-3得到了permit,時間爲Sun Sep 29 21:47:05 CEST 2019
Thread-4得到了permit,時間爲Sun Sep 29 21:47:05 CEST 2019
Thread-1得到了permit,時間爲Sun Sep 29 21:47:05 CEST 2019
Thread-0得到了permit,時間爲Sun Sep 29 21:47:05 CEST 2019
Thread-3釋放了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-1釋放了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-0釋放了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-2釋放了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-4釋放了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-5得到了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-7得到了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-6得到了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-9得到了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-8得到了permit,時間爲Sun Sep 29 21:47:07 CEST 2019
Thread-5釋放了permit,時間爲Sun Sep 29 21:47:09 CEST 2019
Thread-8釋放了permit,時間爲Sun Sep 29 21:47:09 CEST 2019
Thread-9釋放了permit,時間爲Sun Sep 29 21:47:09 CEST 2019
Thread-6釋放了permit,時間爲Sun Sep 29 21:47:09 CEST 2019
Thread-7釋放了permit,時間爲Sun Sep 29 21:47:09 CEST 2019

2,3,4,1,0先得到了permit,相差兩秒釋放了permit;
5,7,6,9,8得到了permit,相差兩秒釋放了permit;
由於咱們設置的permit是5,全部只能有五個線程得到permit。線程

Exchanger

Exchanger用來交換兩個線程中的數據
示例代碼以下code

public class MyThread52 extends Thread{
    private String str;
    private Exchanger<String> exchanger;
    private int sleepSecond;

    public MyThread52(String str, Exchanger<String> exchanger, int sleepSecond) {
        this.str = str;
        this.exchanger = exchanger;
        this.sleepSecond = sleepSecond;
    }

    public void run() {
        try {
            System.out.println(this.getName() + "啓動, 原數據爲" + str + ", 時間爲" + new Date());
            Thread.sleep(sleepSecond * 1000);
            str = exchanger.exchange(str);
            System.out.println(this.getName() + "交換了數據, 交換後的數據爲" + str + ", 時間爲" + new Date());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<String>();
        MyThread52 et0 = new MyThread52("111", exchanger, 3);
        MyThread52 et1 = new MyThread52("222", exchanger, 2);

        et0.start();
        et1.start();
    }
}

運行結果以下接口

Thread-1啓動, 原數據爲222, 時間爲Sun Sep 29 22:18:36 CEST 2019
Thread-0啓動, 原數據爲111, 時間爲Sun Sep 29 22:18:36 CEST 2019
Thread-0交換了數據, 交換後的數據爲222, 時間爲Sun Sep 29 22:18:39 CEST 2019
Thread-1交換了數據, 交換後的數據爲111, 時間爲Sun Sep 29 22:18:39 CEST 2019

能夠看到,數據發生了交換,時間差爲最長時間3s。get

CyclicBarrier

一組線程等待對方都達到barrier point,再執行接下來的動做,barrier point是循環的,它能夠重用。
示例代碼以下

public class MyThread53 extends Thread{
    private CyclicBarrier cb;
    private int sleepSecond;

    public MyThread53(CyclicBarrier cb, int sleepSecond)
    {
        this.cb = cb;
        this.sleepSecond = sleepSecond;
    }

    public void run()
    {
        try
        {
            System.out.println(this.getName() + "運行了");
            System.out.println(this.getName() + "準備等待了, 時間爲" + new Date());
            Thread.sleep(sleepSecond * 1000);

            cb.await();
            System.out.println(this.getName() + "結束等待了, 時間爲" + new Date());
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }


    public static void main(String[] args)
    {
        Runnable runnable = new Runnable()
        {
            public void run()
            {
                System.out.println("CyclicBarrier的全部線程await()結束了,我運行了, 時間爲" + new Date());
            }
        };
        //須要等待三個線程await()後再執行runnable
        CyclicBarrier cb = new CyclicBarrier(3, runnable);
        MyThread53 cbt0 = new MyThread53(cb, 3);
        MyThread53 cbt1 = new MyThread53(cb, 6);
        MyThread53 cbt2 = new MyThread53(cb, 9);
        cbt0.start();
        cbt1.start();
        cbt2.start();
    }
}

運行結果以下

Thread-0運行了
Thread-1運行了
Thread-2運行了
Thread-1準備等待了, 時間爲Mon Sep 30 23:02:11 CEST 2019
Thread-2準備等待了, 時間爲Mon Sep 30 23:02:11 CEST 2019
Thread-0準備等待了, 時間爲Mon Sep 30 23:02:11 CEST 2019
CyclicBarrier的全部線程await()結束了,我運行了, 時間爲Mon Sep 30 23:02:20 CEST 2019
Thread-2結束等待了, 時間爲Mon Sep 30 23:02:20 CEST 2019
Thread-0結束等待了, 時間爲Mon Sep 30 23:02:20 CEST 2019
Thread-1結束等待了, 時間爲Mon Sep 30 23:02:20 CEST 2019

Runnable線程在Thread-0,Thread-1,Thread-2 await()後運行,Runnable線程和三個線程的執行時間幾乎相同。

Callable和Future

Callable
因爲Runnable接口run()返回值是void類型,執行任務後沒法返回結果。因此咱們須要Callable接口,該接口的call()能夠返回值。
Future
Future表示一個異步計算結果,Future提供了以下方法
get():獲取任務執行結果
cancel():中斷任務
isDone():判斷任務是否執行完成
isCancelled():判斷任務是否被取消

示例代碼以下

public class MyThread54 implements Callable<String> {
    public String call() throws Exception
    {
        System.out.println("進入CallableThread的call()方法, 開始睡覺, 睡覺時間爲" + new Date());
        Thread.sleep(10000);
        return "是ss12";
    }

    public static void main(String[] args) throws Exception
    {
        ExecutorService es = Executors.newCachedThreadPool();
        MyThread54 ct = new MyThread54();
        Future<String> f = es.submit(ct);
        es.shutdown();

        Thread.sleep(5000);
        System.out.println("主線程等待5秒, 當前時間爲" + new Date());

        String str = f.get();
        System.out.println("Future已拿到數據, str = " + str + ", 當前時間爲" + new Date());
    }
}

運行結果以下

進入CallableThread的call()方法, 開始睡覺, 睡覺時間爲Sun Nov 03 11:00:22 CET 2019
主線程等待5秒, 當前時間爲Sun Nov 03 11:00:27 CET 2019
Future已拿到數據, str = 是ss12, 當前時間爲Sun Nov 03 11:00:32 CET 2019

能夠看到,Future在10s後拿到了返回結果。

相關文章
相關標籤/搜索