併發編程基礎-信號量機制

信號量(Semaphore)是一種控制多線程(進程)訪問共享資源的同步機制,是由荷蘭的Dijkstra大佬在1962年先後提出來的。java

信號量的原理

信號量機制包含如下幾個核心概念:編程

  1. 信號量S,整型變量,須要初始化值大於0
  2. P原語,荷蘭語Prolaag(probeer te verlagen),表示減小信號量,該操做必須是原子的
  3. V原語,荷蘭語Verhogen,表示增長信號量,該操做必須是原子的

信號量

從上圖不難看出信號量的兩個核心操做,P和V:多線程

  1. P操做,原子減小S,而後若是S < 0,則阻塞當前線程
  2. V操做,原子增長S,而後若是S <= 0,則喚醒一個阻塞的線程

信號量通常被用來控制多線程對共享資源的訪問,容許最多S個線程同時訪問臨界區,多於S個的線程會被P操做阻塞,直到有線程執行完臨界區代碼後,調用V操做喚醒。因此PV操做必須是成對出現的。併發

那麼信號量能夠用來幹什麼呢?編程語言

  1. 信號量彷佛天生就是爲限流而生的,咱們能夠很容易用信號量實現一個限流器
  2. 信號量能夠用來實現互斥鎖,初始化信號量S = 1,這樣就只能有一個線程能訪問臨界區。很明顯這是一個不可重入的鎖。
  3. 信號量甚至可以實現條件變量,好比阻塞隊列

動手實現一個信號量

學習這些經典理論的時候,最好的辦法仍是用本身熟悉的編程語言實現一遍。Java併發包提供了一個信號量的java.util.concurrent.Semaphore,是用AbstractQueuedSynchronizer的共享模式實現的,之後會單獨分析關於AQS相關的原理,這裏再也不展開描述,其核心思想是CAS。
下面是我用Java實現的一個簡單的信號量,這裏使用synchronized來替代互斥鎖學習

信號量實現

public class Semaphore {
    /** * 信號量S */
    private int s;

    public Semaphore(int s) {
        this.s = s;
    }

    /** * P原語,原子操做 * <p> * S減decr,若是S小於0,阻塞當前線程 */
    public synchronized void p(int decr) {
        s -= decr;
        if (s < 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                // ...
            }
        }
    }

    /** * V原語,原子操做 * <p> * S加incr,若是S小於等於0,喚醒一個等待中的線程 */
    public synchronized void v(int incr) {
        s += incr;
        if (s <= 0) {
            notify();
        }
    }
}
複製代碼

用信號量限流

public class Limiter implements Executor {

    private Semaphore semaphore;

    public Limiter(int limit) {
        semaphore = new Semaphore(limit);
    }

    public void execute(Runnable runnable) {
        if (runnable != null) {
            new Thread(() -> {
                semaphore.p(1);
                runnable.run();
                semaphore.v(1);
            }).start();
        }
    }
}
複製代碼

用信號量實現互斥鎖

public class SemaphoreLock {

    private Semaphore semaphore = new Semaphore(1);

    public void lock() {
        semaphore.p(1);
    }

    public void unlock() {
        semaphore.v(1);
    }
}
複製代碼

用信號量實現阻塞隊列

實現阻塞隊列須要兩個信號量和一個鎖(鎖也能夠用信號量代替)this

public class SemaphoreBlockingQueue<T> {

    private Semaphore notFull;
    private Semaphore notEmpty;
    private SemaphoreLock lock = new SemaphoreLock();

    private Object[] table;
    private int size;

    public SemaphoreBlockingQueue(int cap) {
        if (cap < 1) {
            throw new IllegalArgumentException("capacity must be > 0");
        }
        notEmpty = new Semaphore(0);
        notFull = new Semaphore(cap);
        table = new Object[cap];
    }

    public void add(T t) {
        // 若是隊列是滿的就會阻塞
        notFull.p(1);
        // lock保證隊列的原子添加
        lock.lock();
        table[size++] = t;
        lock.unlock();
        // 喚醒一個阻塞在notEmpty的線程
        notEmpty.v(1);
    }

    @SuppressWarnings("unchecked")
    public T poll() {
        T element;
        // 若是隊列是空就會阻塞
        notEmpty.p(1);
        // lock保證隊列的原子刪除
        lock.lock();
        element = (T) table[--size];
        lock.unlock();
        // 喚醒一個阻塞在notFull的線程
        notFull.v(1);
        return element;
    }
}
複製代碼
相關文章
相關標籤/搜索