須要提早了解的知識點: AbstractQueuedSynchronizer 實現原理併發
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。好比控制用戶的訪問量,同一時刻只容許1000個用戶同時使用系統,若是超過1000個併發,則須要等待。dom
好比模擬一個停車場停車信號,假設停車場只有兩個車位,一開始兩個車位都是空的。這時若是同時來了兩輛車,看門人容許它們進入停車場,而後放下車攔。之後來的車必須在入口等待,直到停車場中有車輛離開。這時,若是有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開一輛,則又能夠放入一輛,如此往復。ide
public class SemaphoreDemo { private static Semaphore s = new Semaphore(2); public static void main(String[] args) { ExecutorService pool = Executors.newCachedThreadPool(); pool.submit(new ParkTask("1")); pool.submit(new ParkTask("2")); pool.submit(new ParkTask("3")); pool.submit(new ParkTask("4")); pool.submit(new ParkTask("5")); pool.submit(new ParkTask("6")); pool.shutdown(); } static class ParkTask implements Runnable { private String name; public ParkTask(String name) { this.name = name; } @Override public void run() { try { s.acquire(); System.out.println("Thread "+this.name+" start..."); TimeUnit.SECONDS.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } finally { s.release(); } } } }
Semaphore 經過使用內部類Sync繼承AQS來實現。
支持公平鎖和非公平鎖。內部使用的AQS的共享鎖。
具體實現可參考 AbstractQueuedSynchronizer 源碼分析源碼分析
Semaphore 的結構以下:ui
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
構造方法指定信號量的許可數量,默認採用的是非公平鎖,也只能夠指定爲公平鎖。
permits賦值給AQS中的state變量。this
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
得到信號量方法,這兩個方法支持 Interrupt中斷機制,可以使用acquire() 方法每次獲取一個信號量,也可使用acquire(int permits) 方法獲取指定數量的信號量 。spa
public void acquireUninterruptibly() { sync.acquireShared(1); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
這兩個方法不響應Interrupt中斷機制,其它功能同acquire方法機制。.net
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
嘗試得到信號量有三個方法。線程
public void release() { sync.releaseShared(1); }
調用AQS中的releaseShared方法,使得state每次減一來控制信號量。code
public int availablePermits() { return sync.getPermits(); } //=========Sync類======== final int getPermits() { return getState(); }
該方法返回AQS中state變量的值,當前剩餘的信號量個數
public int drainPermits() { return sync.drainPermits(); } //=========Sync類======== final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
獲取並返回當即可用的全部許可。Sync類的drainPermits方法,獲取1個信號量後將可用的信號量個數置爲0。例如總共有10個信號量,已經使用了5個,再調用drainPermits方法後,能夠得到一個信號量,剩餘4個信號量就消失了,總共可用的信號量就變成6個了。
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } //=========Sync類======== final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } }
該方法是protected 方法,減小信號量個數
public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } //=========AbstractQueuedSynchronizer類======== public final boolean hasQueuedThreads() { //頭結點不等於尾節點就說明鏈表中還有元素 return head != tail; }
protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } //=========AbstractQueuedSynchronizer類======== public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; }
該方法獲取AQS中等待隊列中全部未獲取信號量的線程相關的信息(等待獲取信號量的線程相關信息)。