Semaphore(信號量)是JUC包中比較經常使用到的一個類,它是AQS共享模式的一個應用,能夠容許多個線程同時對共享資源進行操做,而且能夠有效的控制併發數,利用它能夠很好的實現流量控制。Semaphore提供了一個許可證的概念,能夠把這個許可證看做公共汽車車票,只有成功獲取車票的人才可以上車,而且車票是有必定數量的,不可能毫無限制的發下去,這樣就會致使公交車超載。因此當車票發完的時候(公交車以滿載),其餘人就只能等下一趟車了。若是中途有人下車,那麼他的位置將會空閒出來,所以若是這時其餘人想要上車的話就又能夠得到車票了。利用Semaphore能夠實現各類池,咱們在本篇末尾將會動手寫一個簡易的數據庫鏈接池。首先咱們來看一下Semaphore的構造器。數據庫
1 //構造器1 2 public Semaphore(int permits) { 3 sync = new NonfairSync(permits); 4 } 5 6 //構造器2 7 public Semaphore(int permits, boolean fair) { 8 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 9 }
Semaphore提供了兩個帶參構造器,沒有提供無參構造器。這兩個構造器都必須傳入一個初始的許可證數量,使用構造器1構造出來的信號量在獲取許可證時會採用非公平方式獲取,使用構造器2能夠經過參數指定獲取許可證的方式(公平or非公平)。Semaphore主要對外提供了兩類API,獲取許可證和釋放許可證,默認的是獲取和釋放一個許可證,也能夠傳入參數來同時獲取和釋放多個許可證。在本篇中咱們只講每次獲取和釋放一個許可證的狀況。數組
1.獲取許可證併發
1 //獲取一個許可證(響應中斷) 2 public void acquire() throws InterruptedException { 3 sync.acquireSharedInterruptibly(1); 4 } 5 6 //獲取一個許可證(不響應中斷) 7 public void acquireUninterruptibly() { 8 sync.acquireShared(1); 9 } 10 11 //嘗試獲取許可證(非公平獲取) 12 public boolean tryAcquire() { 13 return sync.nonfairTryAcquireShared(1) >= 0; 14 } 15 16 //嘗試獲取許可證(定時獲取) 17 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { 18 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 19 }
上面的API是Semaphore提供的默認獲取許可證操做。每次只獲取一個許可證,這也是現實生活中較常遇到的狀況。除了直接獲取還提供了嘗試獲取,直接獲取操做在失敗以後可能會阻塞線程,而嘗試獲取則不會。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時咱們比較經常使用到的是acquire方法去獲取許可證。下面咱們就來看看它是怎樣獲取的。能夠看到acquire方法裏面直接就是調用sync.acquireSharedInterruptibly(1),這個方法是AQS裏面的方法,咱們在講AQS源碼系列文章的時候曾經講過,如今咱們再來回顧一下。ide
1 //以可中斷模式獲取鎖(共享模式) 2 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { 3 //首先判斷線程是否中斷, 若是是則拋出異常 4 if (Thread.interrupted()) { 5 throw new InterruptedException(); 6 } 7 //1.嘗試去獲取鎖 8 if (tryAcquireShared(arg) < 0) { 9 //2. 若是獲取失敗則進人該方法 10 doAcquireSharedInterruptibly(arg); 11 } 12 }
acquireSharedInterruptibly方法首先就是去調用tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS裏面是抽象方法,FairSync和NonfairSync這兩個派生類實現了該方法的邏輯。FairSync實現的是公平獲取的邏輯,而NonfairSync實現的非公平獲取的邏輯。測試
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 //非公平方式嘗試獲取 3 final int nonfairTryAcquireShared(int acquires) { 4 for (;;) { 5 //獲取可用許可證 6 int available = getState(); 7 //獲取剩餘許可證 8 int remaining = available - acquires; 9 //1.若是remaining小於0則直接返回remaining 10 //2.若是remaining大於0則先更新同步狀態再返回remaining 11 if (remaining < 0 || compareAndSetState(available, remaining)) { 12 return remaining; 13 } 14 } 15 } 16 } 17 18 //非公平同步器 19 static final class NonfairSync extends Sync { 20 private static final long serialVersionUID = -2694183684443567898L; 21 22 NonfairSync(int permits) { 23 super(permits); 24 } 25 26 //嘗試獲取許可證 27 protected int tryAcquireShared(int acquires) { 28 return nonfairTryAcquireShared(acquires); 29 } 30 } 31 32 //公平同步器 33 static final class FairSync extends Sync { 34 private static final long serialVersionUID = 2014338818796000944L; 35 36 FairSync(int permits) { 37 super(permits); 38 } 39 40 //嘗試獲取許可證 41 protected int tryAcquireShared(int acquires) { 42 for (;;) { 43 //判斷同步隊列前面有沒有人排隊 44 if (hasQueuedPredecessors()) { 45 //若是有的話就直接返回-1,表示嘗試獲取失敗 46 return -1; 47 } 48 //獲取可用許可證 49 int available = getState(); 50 //獲取剩餘許可證 51 int remaining = available - acquires; 52 //1.若是remaining小於0則直接返回remaining 53 //2.若是remaining大於0則先更新同步狀態再返回remaining 54 if (remaining < 0 || compareAndSetState(available, remaining)) { 55 return remaining; 56 } 57 } 58 } 59 }
這裏須要注意的是NonfairSync的tryAcquireShared方法直接調用的是nonfairTryAcquireShared方法,這個方法是在父類Sync裏面的。非公平獲取鎖的邏輯是先取出當前同步狀態(同步狀態表示許可證個數),將當前同步狀態減去參入的參數,若是結果不小於0的話證實還有可用的許可證,那麼就直接使用CAS操做更新同步狀態的值,最後無論結果是否小於0都會返回該結果值。這裏咱們要了解tryAcquireShared方法返回值的含義,返回負數表示獲取失敗,零表示當前線程獲取成功但後續線程不能再獲取,正數表示當前線程獲取成功而且後續線程也可以獲取。咱們再來看acquireSharedInterruptibly方法的代碼。ui
1 //以可中斷模式獲取鎖(共享模式) 2 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { 3 //首先判斷線程是否中斷, 若是是則拋出異常 4 if (Thread.interrupted()) { 5 throw new InterruptedException(); 6 } 7 //1.嘗試去獲取鎖 8 //負數:表示獲取失敗 9 //零值:表示當前線程獲取成功, 可是後繼線程不能再獲取了 10 //正數:表示當前線程獲取成功, 而且後繼線程一樣能夠獲取成功 11 if (tryAcquireShared(arg) < 0) { 12 //2. 若是獲取失敗則進人該方法 13 doAcquireSharedInterruptibly(arg); 14 } 15 }
若是返回的remaining小於0的話就表明獲取失敗,所以tryAcquireShared(arg) < 0就爲true,因此接下來就會調用doAcquireSharedInterruptibly方法,這個方法咱們在講AQS的時候講過,它會將當前線程包裝成結點放入同步隊列尾部,而且有可能掛起線程。這也是當remaining小於0時線程會排隊阻塞的緣由。而若是返回的remaining>=0的話就表明當前線程獲取成功,所以tryAcquireShared(arg) < 0就爲flase,因此就不會再去調用doAcquireSharedInterruptibly方法阻塞當前線程了。以上是非公平獲取的整個邏輯,而公平獲取時僅僅是在此以前先去調用hasQueuedPredecessors方法判斷同步隊列是否有人在排隊,若是有的話就直接return -1表示獲取失敗,不然才繼續執行下面和非公平獲取同樣的步驟。this
2.釋放許可證spa
1 //釋放一個許可證 2 public void release() { 3 sync.releaseShared(1); 4 }
調用release方法是釋放一個許可證,它的操做很簡單,就調用了AQS的releaseShared方法,咱們來看看這個方法。線程
1 //釋放鎖的操做(共享模式) 2 public final boolean releaseShared(int arg) { 3 //1.嘗試去釋放鎖 4 if (tryReleaseShared(arg)) { 5 //2.若是釋放成功就喚醒其餘線程 6 doReleaseShared(); 7 return true; 8 } 9 return false; 10 }
AQS的releaseShared方法首先調用tryReleaseShared方法嘗試釋放鎖,這個方法的實現邏輯在子類Sync裏面。code
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 ... 3 //嘗試釋放操做 4 protected final boolean tryReleaseShared(int releases) { 5 for (;;) { 6 //獲取當前同步狀態 7 int current = getState(); 8 //將當前同步狀態加上傳入的參數 9 int next = current + releases; 10 //若是相加結果小於當前同步狀態的話就報錯 11 if (next < current) { 12 throw new Error("Maximum permit count exceeded"); 13 } 14 //以CAS方式更新同步狀態的值, 更新成功則返回true, 不然繼續循環 15 if (compareAndSetState(current, next)) { 16 return true; 17 } 18 } 19 } 20 ... 21 }
能夠看到tryReleaseShared方法裏面採用for循環進行自旋,首先獲取同步狀態,將同步狀態加上傳入的參數,而後以CAS方式更新同步狀態,更新成功就返回true並跳出方法,不然就繼續循環直到成功爲止,這就是Semaphore釋放許可證的流程。
3.動手寫個鏈接池
Semaphore代碼並無很複雜,經常使用的操做就是獲取和釋放一個許可證,這些操做的實現邏輯也都比較簡單,但這並不妨礙Semaphore的普遍應用。下面咱們就來利用Semaphore實現一個簡單的數據庫鏈接池,經過這個例子但願讀者們能更加深刻的掌握Semaphore的運用。
1 public class ConnectPool { 2 3 //鏈接池大小 4 private int size; 5 //數據庫鏈接集合 6 private Connect[] connects; 7 //鏈接狀態標誌 8 private boolean[] connectFlag; 9 //剩餘可用鏈接數 10 private volatile int available; 11 //信號量 12 private Semaphore semaphore; 13 14 //構造器 15 public ConnectPool(int size) { 16 this.size = size; 17 this.available = size; 18 semaphore = new Semaphore(size, true); 19 connects = new Connect[size]; 20 connectFlag = new boolean[size]; 21 initConnects(); 22 } 23 24 //初始化鏈接 25 private void initConnects() { 26 //生成指定數量的數據庫鏈接 27 for(int i = 0; i < this.size; i++) { 28 connects[i] = new Connect(); 29 } 30 } 31 32 //獲取數據庫鏈接 33 private synchronized Connect getConnect(){ 34 for(int i = 0; i < connectFlag.length; i++) { 35 //遍歷集合找到未使用的鏈接 36 if(!connectFlag[i]) { 37 //將鏈接設置爲使用中 38 connectFlag[i] = true; 39 //可用鏈接數減1 40 available--; 41 System.out.println("【"+Thread.currentThread().getName()+"】以獲取鏈接 剩餘鏈接數:" + available); 42 //返回鏈接引用 43 return connects[i]; 44 } 45 } 46 return null; 47 } 48 49 //獲取一個鏈接 50 public Connect openConnect() throws InterruptedException { 51 //獲取許可證 52 semaphore.acquire(); 53 //獲取數據庫鏈接 54 return getConnect(); 55 } 56 57 //釋放一個鏈接 58 public synchronized void release(Connect connect) { 59 for(int i = 0; i < this.size; i++) { 60 if(connect == connects[i]){ 61 //將鏈接設置爲未使用 62 connectFlag[i] = false; 63 //可用鏈接數加1 64 available++; 65 System.out.println("【"+Thread.currentThread().getName()+"】以釋放鏈接 剩餘鏈接數:" + available); 66 //釋放許可證 67 semaphore.release(); 68 } 69 } 70 } 71 72 //剩餘可用鏈接數 73 public int available() { 74 return available; 75 } 76 77 }
測試代碼:
1 public class TestThread extends Thread { 2 3 private static ConnectPool pool = new ConnectPool(3); 4 5 @Override 6 public void run() { 7 try { 8 Connect connect = pool.openConnect(); 9 Thread.sleep(100); //休息一下 10 pool.release(connect); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 } 15 16 public static void main(String[] args) { 17 for(int i = 0; i < 10; i++) { 18 new TestThread().start(); 19 } 20 } 21 22 }
測試結果:
咱們使用一個數組來存放數據庫鏈接的引用,在初始化鏈接池的時候會調用initConnects方法建立指定數量的數據庫鏈接,並將它們的引用存放到數組中,此外還有一個相同大小的數組來記錄鏈接是否可用。每當外部線程請求獲取一個鏈接時,首先調用semaphore.acquire()方法獲取一個許可證,而後將鏈接狀態設置爲使用中,最後返回該鏈接的引用。許可證的數量由構造時傳入的參數決定,每調用一次semaphore.acquire()方法許可證數量減1,當數量減爲0時說明已經沒有鏈接可使用了,這時若是其餘線程再來獲取就會被阻塞。每當線程釋放一個鏈接的時候會調用semaphore.release()將許可證釋放,此時許可證的總量又會增長,表明可用的鏈接數增長了,那麼以前被阻塞的線程將會醒來繼續獲取鏈接,這時再次獲取就可以成功獲取鏈接了。測試示例中初始化了一個3個鏈接的鏈接池,咱們從測試結果中能夠看到,每當線程獲取一個鏈接剩餘的鏈接數將會減1,等到減爲0時其餘線程就不能再獲取了,此時必須等待一個線程將鏈接釋放以後才能繼續獲取。能夠看到剩餘鏈接數老是在0到3之間變更,說明咱們此次的測試是成功的。