Java多線程同步工具類之Semaphore

Semaphore信號量一般作爲控制線程併發個數的工具來使用,它能夠用來限制同時併發訪問資源的線程個數。微信

1、Semaphore使用

下面咱們經過一個簡單的例子來看下Semaphore的具體使用,咱們同時執行10個計數線程,並定義一個Semaphore變量用來控制併發值,同一時間只容許兩個線程併發執行;併發

    public static void main(String[] args) {

        Semaphore semaphore = new Semaphore(2);

        // 啓動計數線程
        for (int i = 1; i <= 10; i++) {
            new SemaphoreThread(semaphore).start();
        }
    }

計數線程工具

public class SemaphoreThread extends Thread {

    private Semaphore semaphore;

    public SemaphoreThread(Semaphore semaphore) {
        this.semaphore = semaphore;
    }

    public void run() {
        try {
            semaphore.acquire();//獲取執行許可
            Thread.sleep(2000);
            System.out.println(this.getName() + "線程," + "開始進行計數");
            // 模擬計數時長
            Thread.sleep(2000);
            // 一個線程完成,容許下一個線程開始計數
            System.out.println(this.getName() + "線程," + "計數完畢");
            semaphore.release();//歸還許可

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

輸出結果源碼分析

Thread-0線程,開始進行計數
Thread-1線程,開始進行計數
Thread-1線程,計數完畢
Thread-0線程,計數完畢
Thread-2線程,開始進行計數
Thread-3線程,開始進行計數
Thread-2線程,計數完畢
Thread-3線程,計數完畢
Thread-4線程,開始進行計數
Thread-5線程,開始進行計數
Thread-5線程,計數完畢
Thread-4線程,計數完畢
Thread-6線程,開始進行計數
Thread-7線程,開始進行計數
Thread-6線程,計數完畢
Thread-7線程,計數完畢
Thread-8線程,開始進行計數
Thread-9線程,開始進行計數
Thread-8線程,計數完畢
Thread-9線程,計數完畢

經過輸出結果能夠看出,Semaphore根據咱們設定的併發值限制了線程同時執行的個數,每次只運行兩個線程進行計數。ui

2、Semaphore源碼分析

接下來咱們對Semaphore具體的內部實現進行分析與總結this

一、Semaphore的構造

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        /**
        一、設置AbstractQueuedSynchronizer中同步狀態的值state,也就是計數器的值。
        二、這個值volatile變量,必須保證線程間的可見性;
        **/
        Sync(int permits) {
            setState(permits);
        }

        //獲取state的值
        final int getPermits() {
            return getState();
        }

        //經過CAS方式減小state值,對應Semaphore的acquire獲取許可
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //經過CAS方式增長state值,對應Semaphore的release歸還許可 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //減小許可 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //許可置0 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }

經過代碼能夠看出Semaphore也是基於AbstractQueuedSynchronizer類來實現的,它會根據你傳入的併發線程數量來構造一個繼承自AbstractQueuedSynchronizer的Syc實現類;spa

二、acquire方法

Semaphore的acquire方法實現獲取執行許可,acquire方法底層調用的實際上是AbstractQueuedSynchronizer的acquireSharedInterruptibly方法,咱們看下具體代碼線程

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //tryAcquireShared由Semaphore的Sync類的nonfairTryAcquireShared方法具體實現
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

從上面咱們已經知道nonfairTryAcquireShared方法內部實際上是一個針對state值減法操做,並經過CAS操做改變同步狀態State的值,直到要獲取的許可線程超過設置的併發值,tryAcquireShared(arg)返回值小於0,執行doAcquireSharedInterruptibly方法開始嘗試獲取鎖,並進入阻塞;code

三、release方法

Semaphore的release方法對應釋放執行許可blog

    public void release() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) { //tryAcquireShared由Semaphore的Sync類的tryReleaseShared方法具體實現,執行歸還許可操做; if (tryReleaseShared(arg)) { //釋放鎖狀態,喚醒阻塞線程 doReleaseShared(); return true; } return false; }

執行tryReleaseShared方法歸還歸許可,對state值作加法操做,沒有問題的話返回true值,執行doReleaseShared方法釋放鎖,喚醒阻塞線程。

3、總結

線程併發個數控制工具Semaphore類與CountDownLatch相似,都是基於AbstractQueuedSynchronizer類實現的,經過操做同步狀態state值結合共享鎖的模式控制一個或多個線程的執行從而實現具體的功能。以上就是對Semaphore類使用與源碼進行的分析與總結,其中若有不足與不正確的地方還望指出與海涵。

 

關注微信公衆號,查看更多技術文章。

相關文章
相關標籤/搜索