通俗易懂的JUC源碼剖析-Semaphore

前言

Semaphore意爲信號量,它用來限制某段時間內的最大併發資源數。例如數據庫鏈接池,停車位等。下面經過停車位的栗子來講明Semaphore的使用方式。java

import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
    private static Semaphore semaphore = new Semaphore(10);
    public static void main(String[] args) {
        for (int i = 1; i <= 50; i++) {
            new Thread(() -> {
                try {
                    if (semaphore.availablePermits() == 0) {
                        System.out.println("車位已滿,須要等待...");
                    }
                    semaphore.acquire();
                    System.out.println("有空餘車位,駛進停車場");
                    // 模擬在停車場smoke or something
                    Thread.sleep(3000);
                    System.out.println("老婆喊我回家吃飯,駛出停車場");
                    semaphore.release();
                } catch (InterruptedException e) {
                    // ignored
                }
            }).start();
        }
    }
}

實現原理

看一眼Semaphore的類結構,內部類繼承了AQS,同時提供了公平和非公平策略。
image.png
咱們能夠在構造函數中指定是公平仍是非公平,默認是非公平策略。數據庫

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

再來看重要方法(以NonfairSync爲例分析):
acquire()編程

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

能夠看到,調用了AQS的模板方法,acquireSharedInterruptibly裏面會調用子類重寫的tryAcquireShared,來看看相關邏輯:併發

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 調用子類方法嘗試獲取共享資源失敗,則在隊列中阻塞獲取    
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    // CAS + 自璇
    for (;;) {
        // 獲取當前剩餘資源數
        int available = getState();
        // 計算獲取acquires個資源後,剩餘資源數
        int remaining = available - acquires;
        // 若是不夠用或者夠用而且CAS設置剩餘數成功,則返回
        // 不然循環重試CAS操做
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
 }
}

release()函數

public void release() {
    sync.releaseShared(1);
}

一樣,release調用了AQS的模板方法,releaseShared裏面會調用子類重寫的tryReleaseShared方法,來看看子類具體實現邏輯:ui

protected final boolean tryReleaseShared(int releases) {
    // CAS + 自璇
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
        }
}

代碼邏輯也很簡單,不作贅述。spa

FairSync公平式的獲取,就是在tryAcquireShared時先判斷隊列中有無在等待的元素,有的話就返回-1,進入同步隊列阻塞獲取。相關代碼以下:code

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

參考資料:
《Java併發編程之美》繼承

相關文章
相關標籤/搜索