信號量(Semaphore)是一種控制多線程(進程)訪問共享資源的同步機制,是由荷蘭的Dijkstra大佬在1962年先後提出來的。java
信號量機制包含如下幾個核心概念:編程
從上圖不難看出信號量的兩個核心操做,P和V:多線程
S < 0
,則阻塞當前線程S <= 0
,則喚醒一個阻塞的線程信號量通常被用來控制多線程對共享資源的訪問,容許最多S個線程同時訪問臨界區,多於S個的線程會被P操做阻塞,直到有線程執行完臨界區代碼後,調用V操做喚醒。因此PV操做必須是成對出現的。併發
那麼信號量能夠用來幹什麼呢?編程語言
S = 1
,這樣就只能有一個線程能訪問臨界區。很明顯這是一個不可重入的鎖。學習這些經典理論的時候,最好的辦法仍是用本身熟悉的編程語言實現一遍。Java併發包提供了一個信號量的java.util.concurrent.Semaphore
,是用AbstractQueuedSynchronizer
的共享模式實現的,之後會單獨分析關於AQS相關的原理,這裏再也不展開描述,其核心思想是CAS。
下面是我用Java實現的一個簡單的信號量,這裏使用synchronized
來替代互斥鎖學習
public class Semaphore {
/** * 信號量S */
private int s;
public Semaphore(int s) {
this.s = s;
}
/** * P原語,原子操做 * <p> * S減decr,若是S小於0,阻塞當前線程 */
public synchronized void p(int decr) {
s -= decr;
if (s < 0) {
try {
wait();
} catch (InterruptedException e) {
// ...
}
}
}
/** * V原語,原子操做 * <p> * S加incr,若是S小於等於0,喚醒一個等待中的線程 */
public synchronized void v(int incr) {
s += incr;
if (s <= 0) {
notify();
}
}
}
複製代碼
public class Limiter implements Executor {
private Semaphore semaphore;
public Limiter(int limit) {
semaphore = new Semaphore(limit);
}
public void execute(Runnable runnable) {
if (runnable != null) {
new Thread(() -> {
semaphore.p(1);
runnable.run();
semaphore.v(1);
}).start();
}
}
}
複製代碼
public class SemaphoreLock {
private Semaphore semaphore = new Semaphore(1);
public void lock() {
semaphore.p(1);
}
public void unlock() {
semaphore.v(1);
}
}
複製代碼
實現阻塞隊列須要兩個信號量和一個鎖(鎖也能夠用信號量代替)this
public class SemaphoreBlockingQueue<T> {
private Semaphore notFull;
private Semaphore notEmpty;
private SemaphoreLock lock = new SemaphoreLock();
private Object[] table;
private int size;
public SemaphoreBlockingQueue(int cap) {
if (cap < 1) {
throw new IllegalArgumentException("capacity must be > 0");
}
notEmpty = new Semaphore(0);
notFull = new Semaphore(cap);
table = new Object[cap];
}
public void add(T t) {
// 若是隊列是滿的就會阻塞
notFull.p(1);
// lock保證隊列的原子添加
lock.lock();
table[size++] = t;
lock.unlock();
// 喚醒一個阻塞在notEmpty的線程
notEmpty.v(1);
}
@SuppressWarnings("unchecked")
public T poll() {
T element;
// 若是隊列是空就會阻塞
notEmpty.p(1);
// lock保證隊列的原子刪除
lock.lock();
element = (T) table[--size];
lock.unlock();
// 喚醒一個阻塞在notFull的線程
notFull.v(1);
return element;
}
}
複製代碼