1965年,荷蘭計算機科學家Dijkstra提出的信號量機制成爲一種高效的進程同步機制。這以後的15年,信號量一直都是併發編程領域的終結者。1980年,管程被提出,成爲繼信號量以後的在併發編程領域的第二個選擇。目前幾乎全部的語言都支持信號量機制,Java也不例外。Java中提供了Semaphore
併發工具類來支持信號量機制。下面咱們就來了解Java實現的信號量機制。
首先介紹信號量模型,而後介紹如何使用,最後使用信號量來實現一個限流器。html
信號量模型圖(圖來自參考[1]):java
信號量模型總結爲:一個計數器、一個等待隊列和三個對外調用的方法。
計數器和等待隊列時對外透明的,全部咱們只能經過三個對外方法來訪問計數器和等待隊列。
init()
:設置計數器的初始值。
down()
:計數器的值減一。若是此時計數器的值小於0,則當前線程插入等待隊列並阻塞,不然當前線程能夠繼續執行。
up()
:計數器的值加一。若是此時計數器的值小於或者等於0,則喚醒等待隊列中的一個線程,並將其從等待隊列中移除。數據庫
這三個方法都是原子性的,由實現信號量模型的方法保證。在Java SDK中,信號量模型是由java.util.concurrent.Semaphore
實現。編程
信號量模型代碼化大體相似以下:安全
class Semaphore{ int count; // 計數器 Queue queue; // 等待隊列 // 初始化操做 Semaphore(int c){ this.count=c; } void down(){ this.count--; // 計數器值減一 if(this.count < 0){ // 將當前線程插入等待隊列 // 阻塞當前線程 } } void up(){ this.count++; // 計數器值加一 if(this.count <= 0) { // 移除等待隊列中的某個線程T // 喚醒線程T } } }
在信號量模型中,down()
和up()
這兩個操做也被成爲P操做(荷蘭語proberen,測試)和V操做(荷荷蘭語verhogen,增長)。在我學的操做系統教材中(C語言實現),P操做對應wait(),V操做對應singal()。雖然叫法不一樣,可是語義都是相同的。在Java SDK併發包中,down()
和up()
分別對應於Semaphore中的acquire()
和release()
。併發
信號量有時也被稱爲紅綠燈,咱們想一想紅綠燈時怎麼控制交通的,就知道該如何使用信號量。車輛路過十字路時,須要先檢查是否爲綠燈,若是是則通行,不然就等待。想一想和加鎖機制有點類似,都是同樣的操做,先檢查是否符合條件(「嘗試獲取」),符合(「獲取到」)則線程繼續運行,不然阻塞線程。app
下面使用累加器的例子來講明如何使用信號量。編程語言
count+=1
操做是個臨界區,只容許一個線程執行,即要保證互斥。因而咱們在進入臨界區以前,使用down()即Java中的acquire(),在退出以後使用up()即Java中的release()。ide
static int count; //初始化信號量 static final Semaphore s = new Semaphore(1); // 構造函數參數爲1,表示只容許一個線程進行臨界區。可實現一個互斥鎖的功能。 //用信號量保證互斥 static void addOne() { s.acquire(); // 獲取一個許可(可看做加鎖機制中加鎖) try { count+=1; } finally { s.release(); // 歸還許可(可看作加鎖機制中解鎖) } }
完整代碼以下:函數
package com.sakura.concrrent; import java.util.concurrent.Semaphore; public class SemaphoreTest { static int count; static final Semaphore s = new Semaphore(1); static void addOne() throws InterruptedException { //只會有一個線程將信號量中的計數器減爲1,而另一個線程只能將信號量中計數器減爲-1,致使被阻塞 s.acquire(); try { count +=1; System.out.println("Now thread is " + Thread.currentThread() + " and count is " + count); }finally { //進入臨界區的線程在執行完臨界區代碼後將信號量中計數器的值加1而後,此時信號量中計數器的值爲0,則從阻塞隊列中喚醒被阻塞的進程 s.release(); } } public static void main(String[] args) { // 建立兩個線程運行 MyThread thread1 = new MyThread(); MyThread thread2 = new MyThread(); thread1.start(); thread2.start(); System.out.println("main thread"); } } class MyThread extends Thread{ @Override public void run() { super.run(); for(int i=0; i<10; i++) { try { SemaphoreTest.addOne(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
運行結果:
若是Semaphore的構造函數參數(許可數量,內置計數器的值)修改一下:
static final Semaphore s = new Semaphore(2);
則計數器值的爲2,那麼就容許有兩個線程進入臨界區,咱們的count值就會出現問題
當設置信號量的計數器爲1時,可實現一個簡單的互斥鎖功能。可是,咱們前面剛介紹過Java SDK中的Lock
,Semaphore的用途顯然不會與Lock一致,否則就重複造輪子了。Semaphore最重要的一個功能即是:能夠容許多個線程訪問一個臨界區。(上述例子咱們就設置了計數器的值爲2,可發現thread1和thread2均可進入臨界區。)
咱們會在什麼地方碰見這種需求呢?
各類池化資源,例如鏈接池、對象池、線程池等等。例如,數據庫鏈接池,在同一時刻,必定是容許多個線程同時使用鏈接池,固然,每一個鏈接在被釋放以前,是不容許其餘線程使用的。
咱們設計以下能夠容許N個線程使用的對象池,咱們將信號量的計數器值設爲N,就可讓N個線程同時進行臨界區,多餘的就會被阻塞。(代碼來自參考[1])
class ObjPool<T, R> { final List<T> pool; //使用List保存實例對象 // 用信號量實現限流器 final Semaphore sem; // 構造函數 ObjPool(int size, T t){ pool = new Vector<T>(){}; for(int i=0; i<size; i++){ pool.add(t); } sem = new Semaphore(size); } // 獲取對象池的對象,調用 func R exec(Function<T,R> func) { T t = null; sem.acquire(); //容許N個進程同時進入臨界區 try { //咱們須要注意,由於多個進行能夠進入臨界區,因此Vector的remove方法是線程安全的 t = pool.remove(0); return func.apply(t); //獲取對象池匯中的一個對象後,調用func函數 } finally { pool.add(t); //離開臨界區以前,將以前獲取的對象放回到池中 sem.release(); //使得計數器加1,若是信號量中計數器小於等於0,那麼說明有線程在等待,此時就會自動喚醒等待線程 } } } // 建立對象池 ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2); // 經過對象池獲取 t,以後執行 pool.exec(t -> { System.out.println(t); return t.toString(); });
記得學習操做系統時,信號量類型分爲了好幾種整型信號量、記錄型信號量、AND信號量以及「信號量集」(具體瞭解可戳參考[2])。我認爲Java SDK中Semaphore應該是記錄型信號量的實現。不禁想起,編程語言是對OS層面操做的一種抽象描述。這句話須要品須要細細品。
參考: [1] 極客時間專欄王寶令《Java併發編程實戰》 [2] 靜水深流.操做系統之信號量機制總結.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html