J.U.C 之Semaphore

簡介

信號量 Semaphore 是一個控制訪問多個共享資源的計數器,和 CountDownLatch 同樣,其本質上是一個「共享鎖」。java

一個計數信號量。從概念上講,信號量維護了一個許可集。node

若有必要,在許可可用前會阻塞每個 acquire,而後再獲取該許可。
每一個 release 添加一個許可,從而可能釋放一個正在阻塞的獲取者。
複製代碼

可是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並採起相應的行動。安全

Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。併發

下面咱們就一個停車場的簡單例子來闡述 Semaphore :dom

  1. 爲了簡單起見咱們假設停車場僅有 5 個停車位。一開始停車場沒有車輛全部車位所有空着,而後前後到來三輛車,停車場車位夠,安排進去停車,而後又來三輛,這個時候因爲只有兩個停車位,全部只能停兩輛,其他一輛必須在外面候着,直到停車場有空車位。固然,之後每來一輛都須要在外面候着。當停車場有車開出去,裏面有空位了,則安排一輛車進去(至因而哪輛,要看選擇的機制是公平仍是非公平)。ide

  2. 從程序角度看,停車場就至關於信號量 Semaphore ,其中許可數爲 5 ,車輛就相對線程。當來一輛車時,許可數就會減 1 。當停車場沒有車位了(許可數 == 0 ),其餘來的車輛須要在外面等候着。若是有一輛車開出停車場,許可數 + 1,而後放進來一輛車。函數

  3. 信號量 Semaphore 是一個非負整數( >=1 )。當一個線程想要訪問某個共享資源時,它必需要先獲取 Semaphore。當 Semaphore > 0 時,獲取該資源並使 Semaphore – 1 。若是S emaphore 值 = 0,則表示所有的共享資源已經被其餘線程所有佔用,線程必需要等待其餘線程釋放資源。當線程釋放資源時,Semaphore 則 +1 。ui

實現分析

java.util.concurrent.Semaphore 結構以下圖:this

從上圖能夠看出,Semaphore 內部包含公平鎖(FairSync)和非公平鎖(NonfairSync),繼承內部類 Sync ,其中 Sync 繼承 AQS(再一次闡述 AQS 的重要性)。spa

Semaphore 提供了兩個構造函數:

Semaphore(int permits) :建立具備給定的許可數和非公平的公平設置的 Semaphore 。
Semaphore(int permits, boolean fair) :建立具備給定的許可數和給定的公平設置的 Semaphore 。
複製代碼

實現以下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
複製代碼

Semaphore 默認選擇非公平鎖。

當信號量 Semaphore = 1 時,它能夠看成互斥鎖使用。其中 0、1 就至關於它的狀態:

  1. 當 =1 時表示,其餘線程能夠獲取;
  2. 當 =0 時,排他,即其餘線程必需要等待。

Semaphore 的代碼實現結構,和 ReentrantLock 相似。

信號量獲取

Semaphore 提供了 #acquire() 方法,來獲取一個許可。

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
複製代碼

內部調用 AQS 的 #acquireSharedInterruptibly(int arg) 方法,該方法以共享模式獲取同步狀態。代碼以下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
複製代碼

在 #acquireSharedInterruptibly(int arg) 方法中,會調用 #tryAcquireShared(int arg) 方法。而 #tryAcquireShared(int arg) 方法,由子類來實現。對於 Semaphore 而言,若是咱們選擇非公平模式,則調用 NonfairSync 的#tryAcquireShared(int arg) 方法,不然調用 FairSync 的 #tryAcquireShared(int arg) 方法。若 #tryAcquireShared(int arg) 方法返回 < 0 時,則會阻塞等待,從而實現 Semaphore 信號量不足時的阻塞,代碼以下:

// AQS.java
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            /** * 對於 Semaphore 而言,若是 tryAcquireShared 返回小於 0 時,則會阻塞等待。 */
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

另外,這也是爲何 Semaphore 在使用 AQS 時,state 表明的是,剩餘可獲取的許可數,而不是已經使用的許可數。咱們假設 state 表明的是已經使用的許可數,那麼 #tryAcquireShared(int arg) 返回的結果 = 原始許可數 - state ,這個操做在併發狀況下,會存在線程不安全的問題。因此,state 表明的是,剩餘可獲取的許可數,而不是已經使用的許可數。

公平狀況的 FairSync 的方法實現,代碼以下:

// FairSync.java
@Override
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //判斷該線程是否位於CLH隊列的列頭,從而實現公平鎖
        if (hasQueuedPredecessors())
            return -1;
        //獲取當前的信號量許可
        int available = getState();

        //設置「得到acquires個信號量許可以後,剩餘的信號量許可數」
        int remaining = available - acquires;

        //CAS設置信號量
        if (remaining < 0 ||
                compareAndSetState(available, remaining))
            return remaining;
    }
}
複製代碼

經過 #hasQueuedPredecessors() 方法,判斷該線程是否位於 CLH 隊列的列頭,從而實現公平鎖。 非公平狀況的 NonfairSync 的方法實現,代碼以下:

// NonfairSync.java
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

// Sync.java
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
複製代碼

對於非公平而言,由於它不須要判斷當前線程是否位於 CLH 同步隊列列頭,因此相對而言會簡單些。

信號量釋放

獲取了許可,當用完以後就須要釋放,Semaphore 提供 #release() 方法,來釋放許可。代碼以下:

public void release() {
    sync.releaseShared(1);
}
複製代碼

內部調用 AQS 的 #releaseShared(int arg) 方法,釋放同步狀態。

// AQS.java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

releaseShared(int arg) 方法,會調用 Semaphore 內部類 Sync 的 #tryReleaseShared(int arg) 方法,釋放同步狀態。

// Sync.java
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        //信號量的許可數 = 當前信號許可數 + 待釋放的信號許可數
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //設置可獲取的信號許可數爲next
        if (compareAndSetState(current, next))
            return true;
    }
}
複製代碼

如該方法返回 true 時,表明釋放同步狀態成功,從而在 #releaseShared(int args) 方法中,調用 #doReleaseShared() 方法,可喚醒阻塞等待 Semaphore 的許可的線程。

應用示例

咱們已停車爲示例:

public class SemaphoreTest {

    static class Parking {
    
        //信號量
        private Semaphore semaphore;

        Parking(int count) {
            semaphore = new Semaphore(count);
        }

        public void park() {
            try {
                //獲取信號量
                semaphore.acquire();
                long time = (long) (Math.random() * 10);
                System.out.println(Thread.currentThread().getName() + "進入停車場,停車" + time + "秒..." );
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName() + "開出停車場...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }


    static class Car extends Thread {
        Parking parking ;

        Car(Parking parking){
            this.parking = parking;
        }

        @Override
        public void run() {
            parking.park();     //進入停車場
        }
    }

    public static void main(String[] args){
        Parking parking = new Parking(3);

        for(int i = 0 ; i < 5 ; i++){
            new Car(parking).start();
        }
    }
}
複製代碼

運行結果以下:

相關文章
相關標籤/搜索