多線程之美3一Java併發工具類

1、簡介

1.一、 Semaphore

信號量,見文知義,經常使用於併發控制中的限流做用,我理解是限定數量的共享鎖機制。該共享資源最多同時可以讓n個線程訪問,超過n個線程就阻塞等待,若有資源空閒, 喚醒其餘等待線程(喚醒又分公平與非公平,默認非公平)好比一條四車道大橋,每次僅能併發經過4輛汽車,而在高峯期時100輛車涌入,此次須要一個信號燈來限制車輛,每次最多放行4輛車,在車輛經過後再放行。在併發環境下,每輛車就是如一個線程,4車道大橋就若有限的資源,須要控制線程的數量,在這種業務場景下,靠鎖同步的機制(如synchronized)力有不逮,java併發包中提供Semaphore類能夠幫助解決此類場景。java

應用場景: 
一、資源控制:控制數據庫鏈接數,若有多個IO操做,每一個須要操做數據庫寫入,數據庫鏈接數有限,控制鏈接數據庫數量。
例:100個線程執行IO,只有10個mysql鏈接,最多同時能夠有10個線程獲取到鏈接,不然會報錯沒法獲取鏈接,這時可用信號量控制。

二、可當同步鎖使用,設置信號量通道等於1。

1.二、CountDownLatch

容許一個或多個線程等待其餘線程完成操做後再執行。其內部維護一個計數器,設置初始值給state,每調用 countDown()方法一次,state數量減1,調用await()方法的線程被阻塞,須要等待state減小爲0時纔可被喚醒繼續執行。mysql

應用場景: 
一、一個任務要統計公司一星期的財務流水總額,每次須要讀取5張Excel表統計流水彙總,如何快速地統計出來?
可使用CountDownLatch,先開始5個線程併發地分別統計每張表的流水額度總和,當5個線程統計結束,再彙總總額。

二、開發對外接口,要求響應快,而該接口內部邏輯複雜,涉及多個服務的調用,並依賴這些獨立服務響應結果進行下一步操做,這時能夠考慮CountDownLatch或者 CyclicBarrier,併發調用多個服務,獲取這些結果後才進行下一步,縮短處理時間。

1.三、CyclicBarrier

循環屏障,柵欄類; CyclicBarrier 可以讓多個線程相互等待,當全部線程都達到後再喚醒全部線程繼續執行。如導遊設定目標點,全部遊客到這集合,先到的遊客等待其餘遊客,當全部的遊客都到了後,你們再一塊兒出發。sql

CyclicBarrier與CountDownLatch區別:
1)  CyclicBarrier 在應用場景中,多個線程之間相互等待,線程之間在業務上可能會更有依賴性 ; CountDownLatch是每一個執行 coutDown方法的線程之間能夠沒有依賴性,而是執行await方法的線程更依賴這些執行coutDown的線程。

2)CountDownLatch計數器只能使用一次,Semaphore能夠屢次使用,能夠重置使用。

3)CyclicBarrier 能夠處理更復雜的業務場景,如線程都達到屏障後,能夠在構造函數中 CyclicBarrier(int parties, Runnable barrierAction) 傳入線程barrierAction,當達到屏障觸發條件時,能夠比其餘等待線程優先執行,處理業務。

4)CyclicBarrier還有不少方法,如查看當前到達屏障被阻塞的線程數量 getNumberWaiting()。

5)CountDownLatch 是每一個線程執行完減1操做,當計數器爲0時,才喚醒等待線程,阻塞線程只有1個或者多個;CyclicBarrier是讓全部線程到達屏障處就被阻塞了,當全部線程都到達時,喚醒全部被阻塞線程繼續執行,阻塞線程有多個,線程之間是相互等待。

2、信號量Semaphore

2.一、構造方法

Semaphore(int permits) //默認非公平
//是否公平嘗試獲取許可證  
Semaphore(int permits, boolean fair)

2.二、主要方法

acquire() //從信號量那去獲取一個許可,若是沒有剩餘的就被阻塞
acquire(int permits)//也可一次獲取多個許可
release() //釋放一個許可,將其返還給信號量,給其餘線程使用。 
release(int permits) //一次釋放多個許可

2.三、示例代碼

package Semaphore;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
 * @author zdd
 * 2019/11/27 6:43
 * Description: 信號量測試
 */
public class SemaphoreService {
    private  static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    private Semaphore semaphore = new Semaphore(4);

    public void doSomeThing() throws InterruptedException {
        //1,獲取一個許可
        semaphore.acquire();

        System.out.println(Thread.currentThread().getName() + "--start--" +getFormatDate());
        // 停頓1s
        TimeUnit.SECONDS.sleep(1);
        System.out.println(Thread.currentThread().getName() + "--end--" +getFormatDate());
        //2,用完釋放一個許可,可供其餘線程使用
        semaphore.release();
    }
    public static void main(String[] args) {
        SemaphoreService semaphoreService  = new SemaphoreService();
        for (int i = 0; i < 10; i++) {
            WorkThread workThread = new WorkThread(i+"",semaphoreService);
            workThread.start();
        }
    }

    public String getFormatDate() {
      return sdf.format(new Date());
    }
   // 工做線程類 - 靜態內部類
   static  class WorkThread extends Thread {

        SemaphoreService semaphoreService;
       //構造參數傳入線程名稱及SemaphoreService
        public WorkThread(String name, SemaphoreService semaphoreService) {
            this.semaphoreService = semaphoreService;
            setName(name);
        }
        @Override
        public void run() {
            try {
                semaphoreService.doSomeThing();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

1.線程最多4個同時執行數據庫

//1,設置信號量數爲4,執行結果以下圖
private Semaphore semaphore = new Semaphore(4);

2.線程依次執行,實現同步併發

//1,設置信號量數爲1,執行結果以下圖
private Semaphore semaphore = new Semaphore(1);

3、計數器CountDownLatch

3.一、主要方法

countDown() 計數器減1
await() 阻塞等待,直到計數器爲0喚醒繼續執行
await(long timeout, TimeUnit unit) 阻塞等待,在等待設定時間計時器還沒到減爲0,也不會再繼續等待了。

3.二、示例代碼1

package countDownLatch;

import javax.swing.plaf.IconUIResource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author zdd
 * 2019/11/29 7:19 下午
 * Description: 班長召集士兵集合拉練案例
 */
public class CountDownLatchSoldierTest {

    private static final Integer THREAD_COUNT = 10;
    //等待士兵10人
    private static  CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws InterruptedException {
        //1,開10個線程模擬士兵簽到,採用線程池建立
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        for (int i = 0; i < 10; i++) {
            //2,提交10個線程執行
            executorService.submit(()->{
                try{
                    System.out.println(Thread.currentThread().getName()+ ": 士兵報告");
                }finally {
                    //3,計數器減1
                    countDownLatch.countDown();
                }
            });
        }
        //3,主線程作班長,負責等待全部士兵到齊,開始拉練
        countDownLatch.await();
        System.out.println("班長: 集合完畢,開始5千米越野! ");

        //4,關閉線程池
        executorService.shutdown();
    }
}

3.三、示例代碼2

計算器未減到0,主線程持續等待。ide

package countDownLatch;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * zdd
 * 2019/11/5 10:50 
 * Description:測試計數器使用
 */
public class CountDownLatchTest {

    static  int number = 2;
    //1,設置計數器的值爲 2
    static CountDownLatch countDownLatch = new CountDownLatch(number);


    public static void  main(String[] args) throws InterruptedException {
        //2,開啓一個線程,傳入計數器
        new Thread(()-> {
             try {
                //睡2s,模擬執行業務
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("調用countDown前, 當前count 值爲:"+countDownLatch.getCount());
                //3,調用遞減 1次
                countDownLatch.countDown();
                System.out.println("調用countDown後, 當前count 值爲:"+countDownLatch.getCount());
            }
        },"worker").start();

        //狀況一、主線程開始執行 ,等待1s以後,若是還沒到條件,也不等了哦
       // countDownLatch.await(1,TimeUnit.SECONDS);
       //狀況二、阻塞等待,若是計數未到0,一直阻塞等待
        countDownLatch.await();
        System.out.println("主線程繼續執行");
    }
}

主線程阻塞等待,執行結果以下:
函數

4、柵欄類CyclicBarrier

4.一、構造方法

CyclicBarrier(int parties) //parties爲須要等待線程的數量
//barrierAction,在全部其餘線程到達後,優先執行的線程。可根據業務添加
CyclicBarrier(int parties, Runnable barrierAction)

4.二、示例代碼

package CyclicBarrier;

import java.util.Map;
import java.util.concurrent.*;
/**
 * @author zdd
 * 2019/11/28 9:20
 * Description: 開啓4個線程分別統計sheet表,使用柵欄類實現同步統計計算,最後計算總和;此處也可用  
 * CountDownLatch實現
 */
public class BankAccountService {
    private ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>();
  
    // 線程數
    private final static int THREAD_COUNT = 4;
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
        @Override
        public void run() {
            Integer sumcount = 0;
            //彙總每一個sheet計算結果
            for (Map.Entry<String, Integer> entry: concurrentHashMap.entrySet()) {
                sumcount+=entry.getValue();
            }
            System.out.println("優先執行,  求和計算完畢,總和爲:"+ sumcount);
        }
    });

    public void  count() {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"到達屏障!");
                    concurrentHashMap.put(Thread.currentThread().getName(),1);

                    try {
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName()+"繼續執行!");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executor.shutdown();
    }

    public static void main(String[] args) {
        BankAccountService bankAccountService = new BankAccountService();
        bankAccountService.count();
    }
}

運行結果以下:測試

// 將線程數改成2,阻塞等待其餘線程
 private final static int THREAD_COUNT = 2;

相關文章
相關標籤/搜索