concurrent包的同步器:CountDownLatch、CyclicBarrier、Semaphorejava
名稱 | 功能 | 構成 | 主要方法 |
---|---|---|---|
CountDownLatch(閉鎖) | 一個線程等待其它線程完成各自工做後在執行 | 繼承aqs | await()/countDown() |
CyclicBarrier (循環屏障) | 一組線程協同工做 | ReentrantLock | await() |
Semaphore(信號) | 控制同時訪問特定資源的線程數量 | 繼承aqs | acquire()/release() |
public class CountDownLatch { /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() / 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c / 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc / 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } }
public class MainLatch implements Runnable { private String coroutineName ; private CountDownLatch cdl ; public MainLatch(String coroutineName, CountDownLatch cdl) { this.coroutineName = coroutineName; this.cdl = cdl; } public static void main(String[] args) throws InterruptedException { ExecutorService ex = Executors.newFixedThreadPool(4); CountDownLatch cdl = new CountDownLatch(3); for (int i = 1; i < 4; i++) { MainLatch mt = new MainLatch(" C " +i,cdl); ex.submit(mt); } cdl.await(); System.out.println(Thread.currentThread()+" OVER"); ex.shutdown(); } public void run() { try { System.out.println(Thread.currentThread().getName()+": and coroutine name :"+coroutineName + " before sleep "); Thread.sleep(5000); System.out.println(Thread.currentThread().getName()+": and coroutine name :"+coroutineName + " after sleep "); cdl.countDown(); } catch (Exception e) { e.printStackTrace(); } } } '結果' pool-1-thread-2: and coroutine name : C 2 before sleep pool-1-thread-1: and coroutine name : C 1 before sleep pool-1-thread-3: and coroutine name : C 3 before sleep pool-1-thread-2: and coroutine name : C 2 after sleep pool-1-thread-3: and coroutine name : C 3 after sleep pool-1-thread-1: and coroutine name : C 1 after sleep Thread[main,5,main] OVER
讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續幹活。框架
核心源碼函數
public class CyclicBarrier { /** * 內部靜態類,每一個Cyclicbarrier都會建立。 * 用來表示當前循環屏障是否被打破 */ private static class Generation { boolean broken = false; } /** 同步鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 同步鎖的狀態變量*/ private final Condition trip = lock.newCondition(); /** 總等待線程數 */ private final int parties; // 這個一個runable() 接口,用來表示全部的線程都執行到位時,調用的處理函數 private final Runnable barrierCommand; // 當前的Generation。每當屏障失效或者開閘以後都會自動替換掉。從而實現重置的功能 private Generation generation = new Generation(); //剩餘線程等待數 private int count; //重置循環屏障 private void nextGeneration() { trip.signalAll(); '喚醒全部等待線程' count = parties; generation = new Generation(); } //破壞選項屏障.通常是線程被中斷或循環屏障重啓使用 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; '檢測循環屏障是否無缺' if (g.broken) throw new BrokenBarrierException(); '檢測當前線程是否被中斷' if (Thread.interrupted()) { breakBarrier(); '被中斷,破壞屏障' throw new InterruptedException(); } '獲取剩餘等待線程數(除了當前線程)' int index = --count; if (index / 0) { '都到底位置' boolean ranAction = false; try { '啓動都到底位置的處理函數' final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); '刷新循環屏障' return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) '默認休眠' trip.await(); '在lock的條件變量隊列中休眠' else if (nanos > 0L) '帶時間參數休眠' nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g / generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } }
public class MainBarrier implements Runnable { private int coroutineNum; private String coroutineName ; private CyclicBarrier cb ; public MainBarrier(int coroutineNum,String coroutineName, CyclicBarrier cb) { this.coroutineNum = coroutineNum; this.coroutineName = coroutineName; this.cb = cb; } public static void main(String[] args) throws InterruptedException { ExecutorService ex = Executors.newFixedThreadPool(4); CyclicBarrier cb = new CyclicBarrier(3); for (int i = 1; i < 4; i++) { MainBarrier mt = new MainBarrier(i," C " +i,cb); ex.submit(mt); } ex.shutdown(); } public void run() { try { System.out.println(Thread.currentThread().getName()+": and coroutine name :"+coroutineName + " before sleep "); Thread.sleep(5000*coroutineNum); cb.await(); System.out.println(Thread.currentThread().getName()+": and coroutine name :"+coroutineName + " after sleep "); } catch (Exception e) { e.printStackTrace(); } } } '運行結果' pool-1-thread-1: and coroutine name : C 1 before sleep pool-1-thread-2: and coroutine name : C 2 before sleep pool-1-thread-3: and coroutine name : C 3 before sleep pool-1-thread-3: and coroutine name : C 3 after sleep pool-1-thread-2: and coroutine name : C 2 after sleep pool-1-thread-1: and coroutine name : C 1 after sleep
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ private final Sync sync; // AQS實現的抽象對象 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current / 0 || compareAndSetState(current, 0)) return current; } } } // 非公平鎖 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } // 公平鎖 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } //初始化:默認非公平鎖 public Semaphore(int permits) { sync = new NonfairSync(permits); } //獲取鎖 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 釋放鎖 public void release() { sync.releaseShared(1); } }
public class MainSemaphore implements Runnable { private int coroutineNum; private String coroutineName ; private Semaphore sp ; private volatile int subNum = 10; public MainSemaphore(int coroutineNum,String coroutineName, Semaphore sp) { this.coroutineNum = coroutineNum; this.coroutineName = coroutineName; this.sp = sp; } public static void main(String[] args) throws InterruptedException { ExecutorService ex = Executors.newFixedThreadPool(5); Semaphore sp = new Semaphore(2); for (int i = 1; i < 5; i++) { MainSemaphore mt = new MainSemaphore(i," C " +i,sp); ex.submit(mt); } ex.shutdown(); } public void run() { try { subNum(); } catch (Exception e) { e.printStackTrace(); } } private void subNum(){ try { sp.acquire(); System.out.println(Thread.currentThread().getName()+": and coroutine name :"+coroutineName + " before sub "); subNum --; sp.release(); System.out.println(Thread.currentThread().getName()+": and coroutine name :"+coroutineName + " after sub "); } catch (Exception e) { } } }