JUC鎖框架——Semaphore

Semaphore簡單介紹

Semaphore是計數信號量。Semaphore管理一系列許可證。每一個acquire方法阻塞,直到有一個許可證能夠得到而後拿走一個許可證;每一個release方法增長一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並無實際的許可證這個對象,Semaphore只是維持了一個可得到許可證的數量。java

Semaphore的簡單示例

Semaphore常常用於限制獲取某種資源的線程數量。下面舉個例子,好比說操場上有5個跑道,一個跑道一次只能有一個學生在上面跑步,一旦全部跑道在使用,那麼後面的學生就須要等待,直到有一個學生不跑了,下面是這個例子:ui

public class Playground {
    private String[] tracks = {"跑道1","跑道2","跑道3","跑道4","跑道5"};//一共有5個跑道
    private volatile boolean[] used = new boolean[5];//標記跑道是否被佔用

    private Semaphore semaphore = new Semaphore(5, true);

    //獲取一個跑道
    public String getTrack() throws InterruptedException {
        semaphore.acquire(1);
        return getNextAvailableTrack();
    }

    //返回一個跑道
    public void releaseTrack(String track) {
        if (makeAsUnused(track))
            semaphore.release(1);
    }

    //遍歷,找到一個沒人用的跑道
    private String getNextAvailableTrack() {
        for (int i = 0; i < used.length; i++) {
            if (!used[i]) {
                used[i] = true;
                return tracks[i];
            }
        }
        return null;
    }

    //釋放跑道,將使用標誌設置爲false
    private boolean makeAsUnused(String track) {
        for (int i = 0; i < used.length; i++) {
            if (tracks[i].equals(track)) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else {
                    return false;
                }
            }
        }
        return false;
    }

    public static void main(String[] args) {
        Executor executor = Executors.newCachedThreadPool();
        Playground playground = new Playground();
        Runnable runnable = ()->{
            try {
                String track = playground.getTrack();//獲取跑道
                if (track != null) {
                    System.out.println("學生" + Thread.currentThread().getId() + "在" + track.toString() + "上跑步");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("學生" + Thread.currentThread().getId() + "釋放" + track.toString());
                    playground.releaseTrack(track);//釋放跑道
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 100; i++) {
            executor.execute(runnable);
        }
    }
}

Semaphore的源碼解析

semaphore的構造方法

public Semaphore(int permits) {
    sync = new NonfairSync(permits);//提供許可數量,默認爲非公平模式
}

public Semaphore(int permits, boolean fair) {
    //提供許可數量,指定是否爲公平模式
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync類

Semaphore內部基於AQS的共享模式,因此實現都委託給了Sync類。.net

NonfairSync(int permits) {
    super(permits);
}
Sync(int permits) {
    setState(permits);//AQS的state表示許可證的數量
}

Semaphore的acquire獲取許可方法

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //若是線程被中斷了,拋出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //獲取許可失敗,將線程加入到等待隊列中
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

NonfairSync中tryAcquireShared方法的實現

AQS子類若是要使用共享模式的話,須要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:線程

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

該方法調用了父類中的nonfairTyAcquireShared方法,以下:code

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //獲取剩餘許可數量
        int available = getState();
        //計算給完此次許可數量後的個數
        int remaining = available - acquires;
        //若是許可不夠(獲取許可失敗)或者能夠將許可數量重置(獲取許可成功)的話,返回。
        //只有在許可不夠時返回值纔會小於0,其他返回的都是剩餘許可數量,這也就解釋了,一旦許可不夠,後面的線程將會阻塞。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

FairSync中tryAcquireShared方法的實現

protected int tryAcquireShared(int acquires) {
    for (;;) {
        //若是前面有線程再等待,直接返回-1
        if (hasQueuedPredecessors())
            return -1;
        //後面與非公平同樣
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

FairSync與NonFairSync的區別就在於會首先判斷當前隊列中有沒有線程在等待,若是有,就老老實實進入到等待隊列;而不像NonfairSync同樣首先試一把,說不定就剛好得到了一個許可,這樣就能夠插隊了。對象

Semaphore的release()釋放許可方法

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

releaseShared方法在AQS中,以下:blog

public final boolean releaseShared(int arg) {
    //若是改變許可數量成功
    if (tryReleaseShared(arg)) {
        doReleaseShared();//一旦CAS改變許可數量成功,就調用該方法釋放阻塞的線程。
        return true;
    }
    return false;
}

AQS子類實現共享模式的類須要實現tryReleaseShared類來判斷是否釋放成功,實現以下:隊列

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //獲取當前許可數量
        int current = getState();
        //計算回收後的數量
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS改變許可數量成功,返回true
        if (compareAndSetState(current, next))
            return true;
    }
}

Semaphore的reducePermits減少許可數量方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

能夠看到,委託給了Sync,Sync的reducePermits方法以下:資源

final void reducePermits(int reductions) {
    for (;;) {
        //獲得當前剩餘許可數量
        int current = getState();
        //獲得減完以後的許可數量
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        //若是CAS改變成功
        //CAS改變AQS中的state變量,由於該變量表明許可證的數量。
        if (compareAndSetState(current, next))
            return;
    }
}

Semaphore的drainPermits獲取剩餘許可數量

Semaphore還能夠一次將剩餘的許可數量所有取走,該方法是drain方法,以下:rem

public int drainPermits() {
    return sync.drainPermits();
}

Sync的實現以下:

final int drainPermits() {
     for (;;) {
         int current = getState();
         if (current == 0 || compareAndSetState(current, 0))//CAS將許可數量置爲0。
             return current;
     }
 }

總結

Semaphore是信號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,線程將會被掛起;而一旦有一個線程釋放一個資源,那麼就有可能從新喚醒等待隊列中的線程繼續執行。

參考地址:

相關文章
相關標籤/搜索