學習筆記六:線程間的協做與通訊之併發工具類

繼續 上一篇 《 學習筆記五:線程間的協做與通訊java

在jdk的併發包裏提供了幾個很是有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類提供了在線程間交換數據的一種手段,以下主要介紹工具類的使用。數據庫

一、等待多線程完成的CountDownLatch

CountDownLatch容許一個或多個線程等待其餘線程完成操做。多線程

CountDownLatch類其實是使用計數器的方式去控制的,當咱們初始化CountDownLatch的時候傳入了一個int變量這個時候在類的內部初始化一個int的變量,每當咱們調用countDownt()方法的時候就使得這個變量的值減1,而對於await()方法則去判斷這個int的變量的值是否爲0,是則表示全部的操做都已經完成,不然繼續等待。併發

構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。計數器的值必須大於等於0,只是等於0的時候,計數器就是零,調用await方法時不會阻塞當前線程;這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值;一個線程調用countDown方法happen-before,另一個線程調用await方法。app

與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。ide

示例:boss等待全部員工來開會,當全部人員都到齊以後,boss宣佈開始會議!!!函數

package com.black.example.mutilThread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Created by 10250H on 2018/7/26.
 */
public class CountDownLatchDemo {
    //聲明countDownLatch 變量,初始化線程數量(內部計數器)
    static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) {
        new Thread(new MyRunner(countDownLatch, "小李", 2000)).start();
        new Thread(new MyRunner(countDownLatch, "小張", 4000)).start();
        new Thread(new MyRunner(countDownLatch, "小王", 5000)).start();

        try {
            System.out.println("等待員工到來開會。。。。。。。");
            //注意這裏是await。主線程將會一直等待在這裏,當全部線程都執行 countDownLatch.countDown();以後當前線程纔會繼續執行
            countDownLatch.await();
            startMeeting("老闆");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void startMeeting(String name) {
        System.out.println(name + "說:人齊了。會議開始!!");
    }

    static class MyRunner implements Runnable {
        CountDownLatch countDownLatch;
        String name;
        int time;

        public MyRunner(CountDownLatch countDownLatch, String name, int time) {
            this.countDownLatch = countDownLatch;
            this.name = name;
            this.time = time;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " 開始出發去公司。");
                TimeUnit.SECONDS.sleep(1);
                System.out.println(name + " 終於到會議室!!!");
                countDownLatch.countDown();
                System.out.println(name + " 準備好了!!");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}

運行結果:注意,至因而誰先到會議室,每次運行結果都會不同。由於主線程和子線程的調用時由CPU決定的工具

若是某我的缺席會議,咱們不能讓主線程一直等待,因此可使用另一個帶指定時間的await方法-await(long time,TimeUtil unit)的那個帶指定時間後,就好再也不阻塞當前線程。jion也有相似的方法。學習

二、同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier),它要作的事情是,讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。ui

CyclicBarrier默認構造方法時CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達屏障了,而後當前線程被阻塞。   示例代碼以下:

package com.black.example.mutilThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * Created by 10250H on 2018/7/26.
 */
public class CyclicBarrierDemo1 {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new MyThread(1)).start();
        new Thread(new MyThread(2)).start();
    }

    static class MyThread implements Runnable{
        private int counter;
        public MyThread(int counter){
            this.counter = counter;
        }
        @Override
        public void run() {
            try {
                System.out.println("當前值輸出:"+counter);
                TimeUnit.SECONDS.sleep(1);
                cyclicBarrier.await();
                System.out.println("counter="+counter+",線程="+Thread.currentThread().getName()+",繼續執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }
}

運行結果:由於主線程和子線程的調用時由CPU決定的,兩個線程都有可能先執行,因此會產生不一樣的結果

若是把new CyclicBarrier(2) 修改成new CyclicBarrier(3),則主線程和子線程會永遠等待,不會繼續執行,由於第三個尚未到達屏障,因此以前到達屏障的兩個線程都不會繼續執行。

CyclicBarrier還提供了高級構造函數CyclicBarrier(int parties,Runnable barrier-Action),用於在現場到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景,示例代碼以下:

package com.black.example.mutilThread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo2 {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread(3));

    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("通用計數當前值:1");
                    cyclicBarrier.await();
                    System.out.println("默認執行通用線程2");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("通用計數當前值:2");
                    cyclicBarrier.await();
                    System.out.println("默認執行通用線程2");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    static class MyThread implements Runnable{
        private int counter;
        public MyThread(int counter){
            this.counter = counter;
        }
        @Override
        public void run() {
            try {
                System.out.println("優先執行:"+counter);
                TimeUnit.SECONDS.sleep(1);
                System.out.println("counter="+counter+",線程="+Thread.currentThread().getName()+",繼續執行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

運行結果:當兩個線程都到達屏障後,優先執行對象MyThread 的任務。

CyclicBarrier的應用場景

能夠用於多線程計算數據,最後合併計算結果的場景。例如:用一個Excel保存用戶的全部銀行流水,每一個Sheet保存一個帳戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水。最後再用barrierAction總結出整個Excel的日均銀行流水,示例代碼以下:

package com.black.example.mutilThread;

import java.util.Map;
import java.util.concurrent.*;

public class BankWaterService implements Runnable {
    //建立4個屏障,處理完以後執行當前類的run方法
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);
    //假設只有4個sheet,因此啓動4個線程
    private Executor executor = Executors.newFixedThreadPool(4);
    //保存每一個sheet計算出的銀行流水結果
    private ConcurrentHashMap<String,Integer> sheetCountMap = new ConcurrentHashMap<String,Integer>();

    private void count(){
        for (int i=0;i<4;i++){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //1:計算當前sheet的銀行流水數據,計算代碼省略....僞代碼以下:
                    sheetCountMap.put(Thread.currentThread().getName(),1);
                    //計算完成,插入屏障
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    @Override
    public void run() {
        int result=0;
        //彙總每一個sheet的計算結果
        for (Map.Entry<String,Integer> entry:sheetCountMap.entrySet()){
            result+=entry.getValue();
        }
        //輸出結果
        sheetCountMap.put("result",result);
        System.out.println("最終結果:"+result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterService = new BankWaterService();
        bankWaterService.count();
    }
}

運行結果:最終結果:4

 三、CyclicBarrier與CountDownLatch的區別

CountDownLatch計數器只能使用一次,而CyclicBarrier計數器可使用reset()方法重置。

因此CyclicBarrier能夠處理更爲複雜的業務場景,如:計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。

CyclicBarrier其餘用法及源代碼以下,如:

    isBroken() :阻塞的線程是否被中斷,返回值boolean類型

    getNumberWaiting() :獲取Cyclic-Barrier阻塞的線程數量

 四、控制併發線程數的Semaphore

Semaphore(信號量)用來控制同時訪問特定資源的線程數量,經過協調各個線程,以保證合理的使用公共資源。

應用場景

Semaphore能夠用於流量控制,特別是公用資源有限的應用場景,好比數據庫連接。

假若有個需求,要讀幾萬個文件的數據,由於是IO密集型任務,咱們能夠啓動幾十個線程併發讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有10個線程同時獲取數據庫連接保存數據,不然會報錯沒法獲取數據庫連接。這時可使用Semaphore來作流量控制,代碼示例以下:

package com.black.example.mutilThread;

import java.util.concurrent.*;

public class SemaphoreDemo {
    private static final int THREAD_COUNT=30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore semaphore = new Semaphore(10);//10個併發

    public static void main(String[] args) {
        for (int i=0;i<THREAD_COUNT;i++){
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName()+"-----請求資源");
                        //請求獲取資源,若是有空閒資源則會當即獲取,進入臨界區,不然將會等待,一直等待到獲取到臨界區資源
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"----獲取資源,保存數據!");
                        TimeUnit.SECONDS.sleep(1);
                        semaphore.release();//釋放資源
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

}

運行結果:

pool-1-thread-4-----請求資源
pool-1-thread-2-----請求資源
pool-1-thread-4----獲取資源,保存數據!
pool-1-thread-1-----請求資源
pool-1-thread-5-----請求資源
pool-1-thread-3-----請求資源
pool-1-thread-5----獲取資源,保存數據!
pool-1-thread-8-----請求資源
pool-1-thread-1----獲取資源,保存數據!
pool-1-thread-9-----請求資源
pool-1-thread-2----獲取資源,保存數據!
pool-1-thread-7-----請求資源
pool-1-thread-9----獲取資源,保存數據!
pool-1-thread-10-----請求資源
pool-1-thread-11-----請求資源
pool-1-thread-8----獲取資源,保存數據!
pool-1-thread-6-----請求資源
pool-1-thread-3----獲取資源,保存數據!
pool-1-thread-13-----請求資源
pool-1-thread-11----獲取資源,保存數據!
pool-1-thread-10----獲取資源,保存數據!
pool-1-thread-12-----請求資源
pool-1-thread-7----獲取資源,保存數據!
pool-1-thread-15-----請求資源
pool-1-thread-14-----請求資源
pool-1-thread-16-----請求資源
pool-1-thread-17-----請求資源
pool-1-thread-18-----請求資源
pool-1-thread-19-----請求資源
pool-1-thread-21-----請求資源
pool-1-thread-20-----請求資源
pool-1-thread-22-----請求資源
pool-1-thread-23-----請求資源
pool-1-thread-24-----請求資源
pool-1-thread-25-----請求資源
pool-1-thread-27-----請求資源
pool-1-thread-26-----請求資源
pool-1-thread-28-----請求資源
pool-1-thread-29-----請求資源
pool-1-thread-30-----請求資源
pool-1-thread-6----獲取資源,保存數據!
pool-1-thread-13----獲取資源,保存數據!
pool-1-thread-12----獲取資源,保存數據!
pool-1-thread-15----獲取資源,保存數據!
pool-1-thread-16----獲取資源,保存數據!
pool-1-thread-17----獲取資源,保存數據!
pool-1-thread-14----獲取資源,保存數據!
pool-1-thread-21----獲取資源,保存數據!
pool-1-thread-19----獲取資源,保存數據!
pool-1-thread-18----獲取資源,保存數據!
pool-1-thread-20----獲取資源,保存數據!
pool-1-thread-22----獲取資源,保存數據!
pool-1-thread-23----獲取資源,保存數據!
pool-1-thread-26----獲取資源,保存數據!
pool-1-thread-27----獲取資源,保存數據!
pool-1-thread-25----獲取資源,保存數據!
pool-1-thread-24----獲取資源,保存數據!
pool-1-thread-30----獲取資源,保存數據!
pool-1-thread-29----獲取資源,保存數據!
pool-1-thread-28----獲取資源,保存數據!

Semaphore簡單用法以下:

//構造方法,可用的許可證數量,默認使用非公平鎖的方式建立
public Semaphore(int permits){...}
//嘗試獲取許可證
public boolean tryAcquire(){...}
//嘗試獲取許可證,在指定時間內若獲取不到則返回
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException{...}
//返回信號量中當前可用的許可證數
public int availablePermits(){...}
//返回正在等待獲取許可證的線程數(估計值)
public final int getQueueLength() {...}
//查詢是否有任何線程等待獲取許可證
public final boolean hasQueuedThreads(){...}
//減小reduction個許可證,這個方法在使用的子類中頗有用跟蹤不可用資源的信號量
protected void reducePermits(int reduction) {...}
//返回全部等待獲取許可證的線程集合
protected Collection<Thread> getQueuedThreads(){...}

 

上一篇:學習筆記五:線程間的協做與通訊 

下一篇:待續....

相關文章
相關標籤/搜索