我們來聊聊併發工具類Semaphore

什麼是Semaphore

Semaphore是計數信號量。Semaphore管理一系列許可證。每一個acquire方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個release方法增長一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可證這個對象,Semaphore只是維持了一個可得到許可證的數量。java

應用場景

Semaphore能夠用於作流量控制,特別公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發的讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有十個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,咱們就可使用Semaphore來作流控,代碼以下:數據庫

package org.java.base.thread;併發

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;ide

public class SemaphoreTest {ui

private static final int THREAD_COUNT = 30;spa

private static ExecutorService threadPool = Executors
.newFixedThreadPool(THREAD_COUNT);.net

private static Semaphore s = new Semaphore(10);線程

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println(「save data」);
s.release();
} catch (InterruptedException e) {
}
}
});
}code

threadPool.shutdown();
}
}對象

在代碼中,雖然有30個線程在執行,可是隻容許10個併發的執行。Semaphore的構造方法Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示容許10個線程獲取許可證,也就是最大併發數是10。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()獲取一個許可證,使用完以後調用release()歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。

其餘方法

Semaphore還提供一些其餘方法:

  • int availablePermits() :返回此信號量中當前可用的許可證數。
  • int getQueueLength():返回正在等待獲取許可證的線程數。
  • boolean hasQueuedThreads() :是否有線程正在等待獲取許可證。
  • void reducePermits(int reduction) :減小reduction個許可證。是個protected方法。
  • Collection getQueuedThreads() :返回全部等待獲取許可證的線程集合。是個protected方法。

源碼解析

Semaphore有兩種模式,公平模式和非公平模式。公平模式就是調用acquire的順序就是獲取許可證的順序,遵循FIFO;而非公平模式是搶佔式的,也就是有可能一個新的獲取線程剛好在一個許可證釋放時獲得了這個許可證,而前面還有等待的線程。

構造方法

Semaphore有兩個構造方法,以下:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

從上面能夠看到兩個構造方法,都必須提供許可的數量,第二個構造方法能夠指定是公平模式仍是非公平模式,默認非公平模式。
Semaphore內部基於AQS的共享模式,因此實現都委託給了Sync類。
這裏就看一下NonfairSync的構造方法:

NonfairSync(int permits) {
            super(permits);
        }

能夠看到直接調用了父類的構造方法,Sync的構造方法以下:

Sync(int permits) {
            setState(permits);
        }

能夠看到調用了setState方法,也就是說AQS中的資源就是許可證的數量。

獲取許可

先從獲取一個許可看起,而且先看非公平模式下的實現。首先看acquire方法,acquire方法有幾個重載,但主要是下面這個方法

public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

從上面能夠看到,調用了Sync的acquireSharedInterruptibly方法,該方法在父類AQS中,以下:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //若是線程被中斷了,拋出異常
        if (Thread.interrupted())
            throw new InterruptedException();
        //獲取許可失敗,將線程加入到等待隊列中
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

AQS子類若是要使用共享模式的話,須要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

該方法調用了父類中的nonfairTyAcquireShared方法,以下:

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //獲取剩餘許可數量
                int available = getState();
                //計算給完此次許可數量後的個數
                int remaining = available - acquires;
                //若是許可不夠或者能夠將許可數量重置的話,返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

從上面能夠看到,只有在許可不夠時返回值纔會小於0,其他返回的都是剩餘許可數量,這也就解釋了,一旦許可不夠,後面的線程將會阻塞。看完了非公平的獲取,再看下公平的獲取,代碼以下:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                //若是前面有線程再等待,直接返回-1
                if (hasQueuedPredecessors())
                    return -1;
                //後面與非公平同樣
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

從上面能夠看到,FairSync與NonFairSync的區別就在於會首先判斷當前隊列中有沒有線程在等待,若是有,就老老實實進入到等待隊列;而不像NonfairSync同樣首先試一把,說不定就剛好得到了一個許可,這樣就能夠插隊了。
看完了獲取許可後,再看一下釋放許可。

釋放許可

釋放許可也有幾個重載方法,但都會調用下面這個帶參數的方法,

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

releaseShared方法在AQS中,以下:

public final boolean releaseShared(int arg) {
        //若是改變許可數量成功
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

AQS子類實現共享模式的類須要實現tryReleaseShared類來判斷是否釋放成功,實現以下:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //獲取當前許可數量
                int current = getState();
                //計算回收後的數量
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //CAS改變許可數量成功,返回true
                if (compareAndSetState(current, next))
                    return true;
            }
        }

從上面能夠看到,一旦CAS改變許可數量成功,那麼就會調用doReleaseShared()方法釋放阻塞的線程。

減少許可數量

Semaphore還有減少許可數量的方法,該方法能夠用於用於當資源用完不能再用時,這時就能夠減少許可證。代碼以下:

protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

能夠看到,委託給了Sync,Sync的reducePermits方法以下:

final void reducePermits(int reductions) {
            for (;;) {
                //獲得當前剩餘許可數量
                int current = getState();
                //獲得減完以後的許可數量
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                //若是CAS改變成功
                if (compareAndSetState(current, next))
                    return;
            }
        }

從上面能夠看到,就是CAS改變AQS中的state變量,由於該變量表明許可證的數量。

獲取剩餘許可數量

Semaphore還能夠一次將剩餘的許可數量所有取走,該方法是drain方法,以下:

public int drainPermits() {
        return sync.drainPermits();
    }

Sync的實現以下:

final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }

能夠看到,就是CAS將許可數量置爲0。

總結

Semaphore是信號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,線程將會被掛起;而一旦有一個線程釋放一個資源,那麼就有可能從新喚醒等待隊列中的線程繼續執行。

相關文章
相關標籤/搜索