關注微信公衆號 JavaStormjava
Semaphore
如今廣泛翻譯成 "信號量",從概念上講信號量維護着一組 "憑證",獲取到憑證的線程才能訪問資源,使用完成後釋放, 咱們可使用信號量來限制訪問特定資源的併發線程數。數據庫
就像現實生活中的停車場車位,當有空位的時候才能放車子進入,否則就只能等待,出來的車子則釋放憑證。安全
能夠簡單的歸納爲:一個計數器、一個等待隊列、三個方法。 在信號量模型裏,計數器和等待隊列對外是透明的,只能經過信號量模型提供的三個方法訪問它們,init()、acquire()、release()
。微信
這裏提到的 init()、acquire()、release()
三個方法都是原子性的,而且這個原子性是由信號量模型的實現方保證的。在 Java SDK 裏面,信號量模型是由 java.util.concurrent.Semaphore
實現的,Semaphore 這個類可以保證這三個方法都是原子操做。併發
經過一個簡化版的信號模型代碼便於理解:app
public class Semaphore {
//計數器
private int count;
//保存線程的等待隊列
private Queue queue;
/** * 初始化計數器 * @param count */
public Semaphore(int count) {
this.count = count;
}
/** * 獲取憑證 */
public void acquire(){
this.count--;
if(this.count<0){
// 將當前線程插入等待隊列
// 阻塞當前線程
}
}
/** * 釋放憑證 */
public void release(){
this.count++;
if(this.count >= 0) {
// 移除等待隊列中的某個線程 T
// 喚醒線程 T
}
}
}
複製代碼
經過上文咱們瞭解到信號量模型原理,接下來則看如何在實際場景中使用。這裏咱們仍是用累加器的例子來講明信號量的使用吧。在累加器的例子裏面,count++
操做是個臨界區,只容許一個線程執行,也就是說要保證互斥。函數
public class TestSemaPhore {
private static int count;
//初始化信號量爲 1
private static final Semaphore semaphore = new Semaphore(1);
public static void addOne() throws InterruptedException {
//使用信號量保證互斥,只有一個線程進入
semaphore.acquire();
try {
count++;
} finally {
semaphore.release();
}
}
public static int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
//模擬十個線程同時訪問
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
addOne();
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.countDown();
TimeUnit.SECONDS.sleep(3);
int count = getCount();
System.out.println(count);
}
}
複製代碼
咱們來分析下信號量如何保證互斥的。ui
假設兩個線程 T1 和 T2 同時訪問 addOne()
,當他們都調用semaphore.acquire();
的時候,因爲這是一個原子操做,因此只有一個線程能把信號量計數器減爲 0,另一個線程 T2 則是將計數器減爲 -1。對應線程 T1 計數器的值爲 0 ,知足大於等於 0,因此線程 T1 會繼續執行;對於線程 T2,信號量計數器的值爲 -1,小於 0 ,按照咱們以前的信號量模型 acquire()
描述,線程 T2 將被阻塞進入等待隊列。因此此刻只有線程 T1 進入臨界區執行 count++
。this
當前信號量計數器的值爲 -1 ,當線程 T1 執行 semaphore.release()
操做執行完後 計數器 +1 則變成了 0,知足小於等於 0,按照模型的定義,此刻等待隊列中的 T2 將會被喚醒,因而 T2 在 T1 執行完臨界區代碼後纔得到進入代碼領截取的機會,從而保證了互斥性。spa
上面的例子咱們利用信號量實現了一個簡單的互斥鎖,你會不會以爲奇怪,既然 Java SDK 裏面提供了 Lock,爲啥還要提供一個 Semaphore ?其實 Semaphore 還有一個功能是 Lock 不容易實現的,那就是:Semaphore 能夠容許多個線程訪問一個臨界區。
常見的就是池化資源,好比鏈接池、對象池、線程池等。好比熟悉的數據庫鏈接池,在同一時刻容許多個線程同時使用鏈接,固然在每一個鏈接被釋放以前,是容許其餘線程使用的。
如今咱們假設有一個場景,對象池需求,就是一次性建立 N 哥對象,以後全部的線程都複用這 N 個對象,在對象被釋放前,是不容許其餘線程使用。
/** * 對象池 * */
public class ObjectPool {
//使用 阻塞隊列保存對象池
private final ArrayBlockingQueue<InputSaleMapDO> pool;
//信號量
private final Semaphore sem;
/** * 初始化對象池 * * @param size 池大小 */
public ObjectPool(int size) {
pool = new ArrayBlockingQueue<>(size);
sem = new Semaphore(size);
for (int i = 0; i < size; i++) {
InputSaleMapDO inputSaleMapDO = new InputSaleMapDO();
inputSaleMapDO.setId((long) i);
pool.add(inputSaleMapDO);
}
}
//利用對象池的對象調用 function
public Long run(Function<InputSaleMapDO, Long> function) throws InterruptedException {
InputSaleMapDO obj = null;
sem.acquire();
try {
obj = pool.poll();
return function.apply(obj);
} finally {
pool.add(obj);
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
ObjectPool objectPool = new ObjectPool(2);
//模擬十個線程同時訪問
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
objectPool.run(f -> {
System.out.println(f);
return f.getId();
});
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.countDown();
TimeUnit.SECONDS.sleep(30);
}
}
複製代碼
初始化線程池大小 2 ,咱們模擬 10 個線程,每次只能兩個線程分配對象 InputSaleMapDO。
執行完回調函數以後,它們就會釋放對象(這個釋放工做是經過 pool.add(obj) 實現的),同時調用 release() 方法來更新信號量的計數器。若是此時信號量裏計數器的值小於等於 0,那麼說明有線程在等待,此時會自動喚醒等待的線程。
上面的例子中 保存對象池使用了 ArrayBlockingQueue ,是一個線程安全的容器,那麼是否能夠換成 ArrayList?歡迎後臺給出答案。還有假設是停車場的車位做爲對象池,車主停車是否是也可使用 Semaphore 實現?