線程間的同步與通訊(8)——Semaphore源碼分析

前言

系列文章目錄 java

Semaphore(信號量)也是經常使用的併發工具之一,它經常用於流量控制。一般狀況下,公共的資源經常是有限的,例如數據庫的鏈接數。使用Semaphore能夠幫助咱們有效的管理這些有限資源的使用。數據庫

Semaphore的結構和ReentrantLock以及CountDownLatch很像,內部採用了公平鎖與非公平鎖兩種實現,若是你已經看過了ReentrantLock源碼分析CountDownLatch源碼分析,弄懂它將絕不費力。segmentfault

核心屬性

CountDownLatch相似,Semaphore主要是經過AQS的共享鎖機制實現的,所以它的核心屬性只有一個sync,它繼承自AQS:數組

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);
    }

    final int getPermits() {
        return getState();
    }

    final int nonfairTryAcquireShared(int acquires) {
        //省略
    }

    protected final boolean tryReleaseShared(int releases) {
        //
    }

    final void reducePermits(int reductions) {
        //省略
    }

    final int drainPermits() {
        //省略
    }
}

這裏的permits和CountDownLatch的count很像,它們最終都將成爲AQS中的state屬性的初始值。併發

構造函數

Semaphore有兩個構造函數:框架

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

默認的構造函數使用的是非公平鎖,另外一個構造函數經過傳入的fair參數來決定使用公平鎖仍是非公平鎖,這一點和ReentrantLock用的是一樣的套路,都是一樣的代碼框架。函數

公平鎖和非公平鎖的定義以下:工具

static final class FairSync extends Sync {
    
   FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

static final class NonfairSync extends Sync {
    
   NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

獲取信號量

獲取信號量的方法有4個:源碼分析

acquire方法 本質調用
acquire() sync.acquireSharedInterruptibly(1)
acquire(int permits) sync.acquireSharedInterruptibly(permits)
acquireUninterruptibly() sync.acquireShared(1)
acquireUninterruptibly(int permits) sync.acquireShared(permits);

可見,acquire()方法就至關於acquire(1)acquireUninterruptibly同理,只不過一種響應中斷,一種不響應中斷,關於AQS的那四個方法咱們在前面的文章中都已經分析過了,除了其中的tryAcquireShared(arg)由子類實現外,其餘的都由AQS實現。ui

值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中咱們特別提到過tryAcquireShared返回值的含義:

  • 若是該值小於0,則表明當前線程獲取共享鎖失敗
  • 若是該值大於0,則表明當前線程獲取共享鎖成功,而且接下來其餘線程嘗試獲取共享鎖的行爲極可能成功
  • 若是該值等於0,則表明當前線程獲取共享鎖成功,可是接下來其餘線程嘗試獲取共享鎖的行爲會失敗

這裏的返回值其實表明的是剩餘的信號量的值,若是爲負值則說明信號量不夠了。

接下來咱們就看看子類對於tryAcquireShared(arg)方法的實現:

非公平鎖實現

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || 
            compareAndSetState(available, remaining))
            return remaining;
    }
}

與通常的tryAcquire邏輯不一樣,Semaphore的tryAcquire邏輯是一個自旋操做,由於Semaphore是共享鎖,同一時刻可能有多個線程來修改這個值,因此咱們必須使用自旋 + CAS來避免線程衝突。

該方法退出的惟一條件是成功的修改了state值,並返回state的剩餘值。若是剩下的信號量不夠了,則就不須要進行CAS操做,直接返回剩餘值。因此其實tryAcquireShared返回的不是當前剩餘的信號量的值,而是若是扣去acquires以後,當前將要剩餘的信號量的值,若是這個「將要」剩餘的值比0小,則是不會發生扣除操做的。這就比如我要買10個包子,包子鋪如今只剩3個了,則將會返回剩餘3 - 10 = -7個包子,可是事實上包子店並無將包子賣出去,實際剩餘的包子仍是3個;此時若是有另外一我的來只要買1個包子,則將會返回剩餘3 - 1 = 2個包子,而且包子店會將一個包子賣出,實際剩餘的包子數也是2個。

非公平鎖的這種獲取信號量的邏輯其實和CountDownLatch的countDown方法很像:

// CountDownLatch
public void countDown() {
    sync.releaseShared(1);
}

countDown()releaseShared(1)方法中將調用tryReleaseShared

// CountDownLatch
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

對比CountDownLatch的tryReleaseShared方法和Semaphore的tryAcquireShared方法可知,它們的核心邏輯都是減小state的值,只不過CountDownLatch借用了共享鎖的殼,對它而言,減小state的值是一種釋放共享鎖的行爲,由於它的目的是將state值降爲0;而在Semaphore中,減小state的值是一種獲取共享鎖的行爲,減小成功了,則獲取成功。

公平鎖實現

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

經過對比能夠看出,它和nonfairTryAcquireShared的惟一的差異在於:

if (hasQueuedPredecessors())
    return -1;

即在獲取共享鎖以前,先用hasQueuedPredecessors方法判斷有沒有人排在本身前面。關於hasQueuedPredecessors方法,咱們在前面的文章中已經分析過了,它就是判斷當前節點是否有前驅節點,有的話直接返回獲取失敗,由於要讓前驅節點先去獲取鎖。(畢竟公平鎖講究先來後到嘛)

釋放信號量

釋放信號量的方法有2個:

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

可見,release() 至關於調用了 release(1),它們最終都調用了tryReleaseShared(int releases)方法:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

與獲取信號量的邏輯相反,釋放信號量的邏輯是將獲得的信號量再歸還回去,所以是增長state值的操做,代碼自己很容易理解,這裏再也不贅述。

工具方法

除了以上獲取和釋放信號量所用到的方法,Semaphore還定義了一些其餘方法來幫助咱們操做信號量:

tryAcquire

注意,這個tryAcquire不是給acquire方法使用的!!!咱們上面分析信號量的獲取時說過,獲取信號量的acquire方法調用的是AQS的acquireSharedacquireSharedInterruptibly ,而這兩個方法會調用子類的tryAcquireShared方法,子類必須實現這個方法。

而這裏的tryAcquire方法並無定義在AQS的子類中,即既不在NonfairSync,也不在FairSync中,對於使用共享鎖的AQS的子類,也不須要定義這個方法。事實上它直接定義在Semaphore中的。

因此,在看這個方法時,腦海中必定要有一個意識,雖然它和AQS的獨佔鎖的獲取邏輯中的tryAcquire重名了,但實際上它和AQS的獨佔鎖是沒有關係的,不要被它的名字繞暈了。

那麼,這個tryAcquiretryAcquireShared方法有什麼不一樣呢?只要有兩點:

  1. 返回值不一樣:tryAcquire返回boolean類型,tryAcquireShared返回int
  2. tryAcquire必定是採用非公平鎖模式,而tryAcquireShared有公平和非公平兩種實現。

理清楚以上幾點以後,咱們再來看tryAcquire方法的源碼,它有四種重載形式:
兩種不帶超時機制的形式:

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

兩種帶超時機制的形式:

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

其中,不帶超時機制的tryAcquire方法實際上調用的就是nonfairTryAcquireShared(int acquires)方法,它和非公平鎖的tryAcquireShared同樣,只是tryAcquireShared是直接return nonfairTryAcquireShared(acquires),而tryAcquirereturn sync.nonfairTryAcquireShared(1) >= 0;,即直接返回獲取鎖的操做是否成功。

而帶超時機制的tryAcquire方法提供了一種超時等待的方式,這是前面介紹的公平鎖和非公平鎖的獲取鎖邏輯中所沒有的,它本質上調用了AQS的tryAcquireSharedNanos(int arg, long nanosTimeout)方法:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

這個方法咱們在介紹CountDownLatch源碼分析await(long timeout, TimeUnit unit)方法時已經分析過了,屬於老套路了,這裏就不展開了。

reducePermits

reducePermits方法用來減小信號量的總數,這在debug中是頗有用的,它與前面介紹的acquire方法的不一樣點在於,即便當前信號量的值不足,它也不會致使調用它的線程阻塞等待。只要須要減小的信號量的數量reductions大於0,操做最終就會成功,也就是說,即便當前的reductions大於現有的信號量的值也不要緊,因此該方法可能會致使剩餘信號量爲負值。

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

咱們將它和nonfairTryAcquireShared對比一下:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

能夠看出,二者在CAS前的判斷條件並不相同,reducePermits只要剩餘值不比當前值大就能夠,而nonfairTryAcquireShared必需要保證剩餘值不小於0纔會執行CAS操做。

drainPermits

相比reducePermits,drainPermits就更簡單了,它直接將剩下的信號量一次性消耗光,而且返回所消耗的信號量,這個方法在debug中也是頗有用的:

public int drainPermits() {
    return sync.drainPermits();
}
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

實戰

以上咱們分析了信號量的源碼,接下來咱們來分析一下官方給的一個使用的例子:

class Pool {
    private static final int MAX_AVAILABLE = 100;
    // 初始化一個信號量,設置爲公平鎖模式,總資源數爲100個
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    public Object getItem() throws InterruptedException {
        // 獲取一個信號量
        available.acquire();
        return getNextAvailableItem();
    }

    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    // Not a particularly efficient data structure; just for demo

    protected Object[] items = ...whatever kinds of items being managed
    protected boolean[] used = new boolean[MAX_AVAILABLE];

    protected synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null; // not reached
    }

    protected synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }

}

這個例子很簡單,咱們用items數組表明可用的資源,用used數組來標記已經使用的資源的,used[i]的值爲true,則表明items[i]這個資源已經被使用了。

(1) 獲取一個可用資源
咱們調用getItem()來獲取資源,在該方法中會先調用available.acquire()方法請求一個信號量,注意,這裏若是當前信號量數不夠時,是會阻塞等待的;當咱們成功地獲取了一個信號量以後,將會調用getNextAvailableItem方法,返回一個可用的資源。

(2) 釋放一個資源
咱們調用putItem(Object x)來釋放資源,在該方法中會先調用markAsUnused(Object item)將須要釋放的資源標記成可用狀態(即將used數組中對應的位置標記成false), 若是釋放成功,咱們就調用available.release()來釋放一個信號量。

總結

Semaphore是一個有效的流量控制工具,它基於AQS共享鎖實現。咱們經常用它來控制對有限資源的訪問。每次使用資源前,先申請一個信號量,若是資源數不夠,就會阻塞等待;每次釋放資源後,就釋放一個信號量。

(完)

系列文章目錄

相關文章
相關標籤/搜索