本文接着分析Semaphore的實現原理node
Semaphore是一個計數信號量。Semaphore(信號)能夠理解爲一種許可,拿到許可的線程才能夠繼續執行。Semaphore的計數器其實記錄的就是許可的數量,當許可數量爲0時,acquire方法就會阻塞。這個系統和停車位系統很是類似,當停車場還有空位的時候,任何新來的車輛均可以進,當停車場滿的時候,新來的車輛必需要等到有空車位產生的時候才能夠開進停車場。這裏的停車位就至關於Semaphore的許可數量。函數
public static void main(String[] args) throws InterruptedException{ Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 5; i++) { final int threadNum = i; new Thread(() -> { try { semaphore.acquire(); System.out.println("thread" + threadNum + ":entered"); Thread.sleep(1000 * threadNum); System.out.println("thread" + threadNum + ":gone"); semaphore.release(); } catch (InterruptedException e) { System.out.println("thread" + threadNum + ":exception"); } }).start(); } }
看一下輸出結果:oop
thread2:entered thread3:entered thread1:entered thread1:gone thread4:entered thread2:gone thread5:entered thread3:gone thread4:gone thread5:gone Process finished with exit code 0
首先咱們new了一個信號量,給了3個許可,而後在新建5個線程搶佔信號量。一開始1,2,3號線程能夠拿到許可,而後4號線程來了,發現沒有許可了,4號線程阻塞,直到1號線程調用了release後有了許可後,4號線程被喚醒,以此類推。。。ui
Semaphore和Reentrant同樣分別實現了公平鎖和非公平鎖,一樣咱們看非公平鎖。上Sync內部類代碼:線程
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; //構造函數,接收許可的個數 Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } //共享模式下,非公平鎖搶佔 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //可用許可數量減去申請許可數量 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) //返回remaining,小於0表示許可數量不夠,大於0表示許可數量足夠 return remaining; } } //共享模式下釋放許可 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //減小許可數量 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //清空許可數量 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
瞭解了Semaphore對state的定義後,咱們看一下acquire方法,該方法直接調用了sync的acquireSharedInterruptibly:code
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //線程中斷標誌爲true,拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); //首先嚐試獲取須要的許可數量 if (tryAcquireShared(arg) < 0) //當獲取失敗 doAcquireSharedInterruptibly(arg); }
doAcquireSharedInterruptibly在CountdownLatch裏分析過:當獲取許可失敗時,往等待隊列添加當前線程的node,若是隊列沒有初始化則初始化。而後在一個loop裏從隊列head後第一個node開始嘗試獲取許可,爲了避免讓CPU空轉,當head後第一個node嘗試獲取許可失敗的時候,阻塞當前線程,第一個node後的弄的同樣都阻塞,等待被喚醒。隊列
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
當調用了release方法後,意味着有新的許可被釋放,調用sync的releaseShared,接着調用Semaphore的內部類Sync實現的tryReleaseShared嘗試釋放許可。釋放成功後調用AQS的doReleaseShared,在CountdownLatch中也見過這個方法。在以前head後第一個node線程阻塞以前,已經將head狀態設置爲SIGNAL,因此會喚醒第一個node的線程,該線程繼續執行以前的loop,嘗試獲取許可成功,而且當還有剩餘的許可存在時向後傳播喚醒信號,喚醒後繼node的線程,獲取剩餘的許可。rem
Semaphore和CountdownLatch同樣使用了AQS的共享模式;
Semaphore在有許可釋放時喚醒第一個node的線程獲取許可,以後會根據是否還存在許可來決定是否繼續日後傳播喚醒線程的信號。
CountdownLatch在state爲0的時候依次日後傳播喚醒信號,一直傳播到低,直到全部線程被喚醒。get