多線程編程學習九(併發工具類).

CountDownLatch

  1. CountDownLatch 容許一個或多個線程等待其餘線程完成操做。
  2. CountDownLatch 能夠替代 join 的做用,並提供了更豐富的用法。
  3. CountDownLatch 的 countDown 方法,N 會減1;CountDownLatch 的 await 方法會阻塞當前線程,直到 N 變成零。
  4. CountDownLatch 不可能從新初始化或者修改 CountDownLatch 對象的內部計數器的值。
  5. CountDownLatch 內部由 AQS 共享鎖實現。
public class CountDownLatchTest {

    private static final CountDownLatch DOWN_LATCH = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            System.out.println(1);
            DOWN_LATCH.countDown();
            System.out.println(2);
            DOWN_LATCH.countDown();

        }).start();
        DOWN_LATCH.await();
        System.out.println("3");
    }
}

CyclicBarrier

  1. CyclicBarrier 設置一個屏障(也能夠叫同步點),攔截阻塞一組線程,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。
  2. CyclicBarrier 默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。
  3. CyclicBarrier 還提供一個更高級的構造函數 CyclicBarrier(int parties,Runnable barrierAction),用於在線程到達屏障時,優先執行 barrierAction,方便處理更復雜的業務場景。
  4. getNumberWaiting 方法能夠得到 CyclicBarrier 阻塞的線程數量;isBroken()方法用來了解阻塞的線程是否被中斷。
  5. CyclicBarrier 的計數器可使用 reset() 方法重置(CountDownLatch 的計數器只能使用一次)。因此 CyclicBarrier 能處理更爲複雜的業務場景。例如,若是計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。
  6. CyclicBarrier 能夠用於多線程計算數據,最後合併計算結果的場景。
  7. CyclicBarrier 內部採用重入鎖 ReentrantLock 實現。
public class BankWaterService implements Runnable {
 
    // 建立4個屏障,處理完以後執行當前類的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4, this);
    // 假設有4個計算任務,因此只啓動4個線程
    private Executor executor = Executors.newFixedThreadPool(4);
    // 保存每一個任務的計算結果
    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private AtomicInteger atomicInteger = new AtomicInteger(1);

    private void count() {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(() -> {
                // 當前任務的計算結果,計算過程忽略
                sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                // 計算完成,插入一個屏障
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

            }, "線程" + atomicInteger.getAndIncrement());
            executor.execute(thread);
        }
    }

    @Override
    public void run() {
        int result = 0;
        // 彙總每一個任務計算出的結果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        //將結果輸出
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
    }
}

Semaphore

  1. Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
  2. Semaphore 能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。
  3. Semaphore的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。
  4. 首先線程使用 Semaphore 的 acquire() 方法獲取一個許可證,使用完以後調用 release() 方法歸還許可證。還能夠用 tryAcquire() 方法嘗試獲取許可證。
  5. intavailablePermits():返回此信號量中當前可用的許可證數。
  6. intgetQueueLength():返回正在等待獲取許可證的線程數。
  7. booleanhasQueuedThreads():是否有線程正在等待獲取許可證。
  8. Semaphore 內部使用 AQS 共享鎖實現。
public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;
    private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore SEMAPHORE = new Semaphore(10);
    private static AtomicInteger ATOMICINTEGER = new AtomicInteger(1);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            EXECUTOR.execute(() -> {
                try {
                    SEMAPHORE.acquire();
                    System.out.println("save data" + ATOMICINTEGER.getAndIncrement());
                    SEMAPHORE.release();
                } catch (InterruptedException e) {
                }

            });
        }
        EXECUTOR.shutdown();
    }
}

Exchanger

  1. Exchanger(交換者)是一個用於線程間協做的工具類 —— 用於線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過 exchange 方法交換數據,若是第一個線程先執行 exchange() 方法,它會一直等待第二個線程也執行 exchange 方法。
  2. 可簡單地將 Exchanger 對象理解爲一個包含兩個格子的容器,經過 exchanger 方法能夠向兩個格子中填充信息。當兩個格子中的均被填充時,該對象會自動將兩個格子的信息交換,而後返回給線程,從而實現兩個線程的信息交換。
  3. Exchanger 可用於遺傳算法。(遺傳算法:須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出交配結果)
  4. Exchanger 可用於校對工做,好比一份數據須要兩我的同時進行校對,都校對無誤後,才能進行後續處理。這時,就可使用 Exchanger 比較兩份校對結果。
  5. Exchanger 內部採用無鎖 CAS 實現,Exchange 使用了內部對象 Node 的兩個屬性 — item 、match,分佈存儲兩個線程的值。
public class ExchangerTest {

    private static final Exchanger<String> exchange = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(() -> {
            try {
                String result = exchange.exchange("數據A");
                System.out.println("A的exchange結果:" + result);
            } catch (InterruptedException e) {
            }

        });
        threadPool.execute(() -> {
            try {
                String result = exchange.exchange("數據B");
                System.out.println("B的exchange結果:" + result);
            } catch (InterruptedException e) {
            }
        });
        threadPool.shutdown();
    }
}
相關文章
相關標籤/搜索