CountDownLatch
- CountDownLatch 容許一個或多個線程等待其餘線程完成操做。
- CountDownLatch 能夠替代 join 的做用,並提供了更豐富的用法。
- CountDownLatch 的 countDown 方法,N 會減1;CountDownLatch 的 await 方法會阻塞當前線程,直到 N 變成零。
- CountDownLatch 不可能從新初始化或者修改 CountDownLatch 對象的內部計數器的值。
- CountDownLatch 內部由 AQS 共享鎖實現。
public class CountDownLatchTest {
private static final CountDownLatch DOWN_LATCH = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println(1);
DOWN_LATCH.countDown();
System.out.println(2);
DOWN_LATCH.countDown();
}).start();
DOWN_LATCH.await();
System.out.println("3");
}
}
CyclicBarrier
- CyclicBarrier 設置一個屏障(也能夠叫同步點),攔截阻塞一組線程,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。
- CyclicBarrier 默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴 CyclicBarrier 我已經到達了屏障,而後當前線程被阻塞。
- CyclicBarrier 還提供一個更高級的構造函數 CyclicBarrier(int parties,Runnable barrierAction),用於在線程到達屏障時,優先執行 barrierAction,方便處理更復雜的業務場景。
- getNumberWaiting 方法能夠得到 CyclicBarrier 阻塞的線程數量;isBroken()方法用來了解阻塞的線程是否被中斷。
- CyclicBarrier 的計數器可使用 reset() 方法重置(CountDownLatch 的計數器只能使用一次)。因此 CyclicBarrier 能處理更爲複雜的業務場景。例如,若是計算髮生錯誤,能夠重置計數器,並讓線程從新執行一次。
- CyclicBarrier 能夠用於多線程計算數據,最後合併計算結果的場景。
- CyclicBarrier 內部採用重入鎖 ReentrantLock 實現。
public class BankWaterService implements Runnable {
// 建立4個屏障,處理完以後執行當前類的run方法
private CyclicBarrier barrier = new CyclicBarrier(4, this);
// 假設有4個計算任務,因此只啓動4個線程
private Executor executor = Executors.newFixedThreadPool(4);
// 保存每一個任務的計算結果
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private AtomicInteger atomicInteger = new AtomicInteger(1);
private void count() {
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(() -> {
// 當前任務的計算結果,計算過程忽略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
// 計算完成,插入一個屏障
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "線程" + atomicInteger.getAndIncrement());
executor.execute(thread);
}
}
@Override
public void run() {
int result = 0;
// 彙總每一個任務計算出的結果
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
//將結果輸出
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterCount = new BankWaterService();
bankWaterCount.count();
}
}
Semaphore
- Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
- Semaphore 能夠用於作流量控制,特別是公用資源有限的應用場景,好比數據庫鏈接。
- Semaphore的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。
- 首先線程使用 Semaphore 的 acquire() 方法獲取一個許可證,使用完以後調用 release() 方法歸還許可證。還能夠用 tryAcquire() 方法嘗試獲取許可證。
- intavailablePermits():返回此信號量中當前可用的許可證數。
- intgetQueueLength():返回正在等待獲取許可證的線程數。
- booleanhasQueuedThreads():是否有線程正在等待獲取許可證。
- Semaphore 內部使用 AQS 共享鎖實現。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore SEMAPHORE = new Semaphore(10);
private static AtomicInteger ATOMICINTEGER = new AtomicInteger(1);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
EXECUTOR.execute(() -> {
try {
SEMAPHORE.acquire();
System.out.println("save data" + ATOMICINTEGER.getAndIncrement());
SEMAPHORE.release();
} catch (InterruptedException e) {
}
});
}
EXECUTOR.shutdown();
}
}
Exchanger
- Exchanger(交換者)是一個用於線程間協做的工具類 —— 用於線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過 exchange 方法交換數據,若是第一個線程先執行 exchange() 方法,它會一直等待第二個線程也執行 exchange 方法。
- 可簡單地將 Exchanger 對象理解爲一個包含兩個格子的容器,經過 exchanger 方法能夠向兩個格子中填充信息。當兩個格子中的均被填充時,該對象會自動將兩個格子的信息交換,而後返回給線程,從而實現兩個線程的信息交換。
- Exchanger 可用於遺傳算法。(遺傳算法:須要選出兩我的做爲交配對象,這時候會交換兩人的數據,並使用交叉規則得出交配結果)
- Exchanger 可用於校對工做,好比一份數據須要兩我的同時進行校對,都校對無誤後,才能進行後續處理。這時,就可使用 Exchanger 比較兩份校對結果。
- Exchanger 內部採用無鎖 CAS 實現,Exchange 使用了內部對象 Node 的兩個屬性 — item 、match,分佈存儲兩個線程的值。
public class ExchangerTest {
private static final Exchanger<String> exchange = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(() -> {
try {
String result = exchange.exchange("數據A");
System.out.println("A的exchange結果:" + result);
} catch (InterruptedException e) {
}
});
threadPool.execute(() -> {
try {
String result = exchange.exchange("數據B");
System.out.println("B的exchange結果:" + result);
} catch (InterruptedException e) {
}
});
threadPool.shutdown();
}
}