Semaphore源碼解析

一 概述
   二 源碼總覽
   三 acquire-請求令牌
   四 release-釋放令牌
   五 總結
複製代碼

一 概述

semaphore是信號的意思,在併發包中則表示持有指定數量令牌的信號量。它一般用於多線程同時請求令牌的控制。提供了acquire方法用於獲取令牌,當令牌發放完後則進行阻塞等待,持有令牌的線程完成任務後須要調用release方法歸還令牌。semaphore的使用很簡單,如今經過學習它的源碼來了解它的實現原理是怎樣的。java

二 源碼總覽

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    // 全部機制都經過AbstractQueuedSynchronizer子類Sync來完成的
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            // 採用AQS中的state來統計令牌數
            setState(permits);
        }
        ……
    }
    // 非公平鎖
    static final class NonfairSync extends Sync {……}
    // 公平鎖
    static final class FairSync extends Sync{……}
    // 默認構造方法-默認非公平鎖
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 提供選擇公平鎖仍是非公平鎖的構造方法
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    // 請求一個令牌-響應中斷+阻塞
    public void acquire() throws InterruptedException {……}
    // 指定獲取領牌數-響應中斷且阻塞
    public void acquire(int permits) throws InterruptedException {……}
    // 請求一個令牌-不響應中斷+阻塞
    public void acquireUninterruptibly() {……}
    // 嘗試請求一個令牌-非阻塞,失敗當即返回
    public boolean tryAcquire(){……}
    // 嘗試請求一個令牌,阻塞指定時間,超時後返回
    public boolean tryAcquire(long timeout, TimeUnit unit){……}
    // 釋放令牌
    public void release(){……}
}    
複製代碼

整體來講,Semaphore內部有一個繼承於AQS的內部類Sync,利用AQS的共享鎖來實現對共享變量state進行操做,並將state做爲令牌的計數, 並提供了公平和非公平鎖的方式來獲取令牌,總體的設計跟ReentrantLock很像。下面以最經常使用的acquire和release方法爲例,詳細瞭解他們的原理;node

三 acquire-請求令牌

先看acquire的方法體:微信

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
複製代碼

裏面實際調用的是AQS的請求共享鎖方法:多線程

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先進行一遍嘗試獲取鎖,當返回小於0說明令牌不足了,則須要將當前線程加入到等待隊列中 
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
複製代碼

接着先回調在Semaphore中重寫的tryAcquireShared()方法嘗試獲取鎖:併發

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    // 循環獲取令牌
    for (;;) {
        // 獲取當前可用的令牌數
        int available = getState();
        // 當前獲取完後剩下的令牌數
        int remaining = available - acquires;
        // 剩下領牌數小於0或者大於等於0並CAS更新成功則直接返回剩餘令牌數
        if (remaining < 0 || compareAndSetState(available, remaining))
            return remaining;
    }
}
複製代碼

簡單嘗試獲取令牌失敗後則再CAS嘗試幾回後加入同步隊列中休眠等待:oop

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 在同步隊列中增長等待節點
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取當前節點的前驅節點
            final Node p = node.predecessor();
            // 若是前驅節點爲head節點,表示當前節點是同步等待隊列中的第一個,故繼續嘗試一次獲取鎖
            if (p == head) {
                // 嘗試獲取令牌,此時會跳轉到semaphore中(由於重寫了該方法)
                int r = tryAcquireShared(arg);
                // 返回大於0則表示成功獲取到令牌了
                if (r >= 0) {
                    // 將當前節點設爲head節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 自旋幾回後爲避免強佔CPU,則對該線程進行休眠處理
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 因中斷請求則取消排隊請求
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

簡單總結下acquire方法流程爲:學習

  1. 先循環嘗試取獲取令牌,若是還有令牌則直接取到並cas更新剩餘令牌數;
  2. 若上一步嘗試沒取到令牌則將當前線程加入到AQS同步隊列中,並檢查當前是否爲第一個等待節點,是則再嘗試請求;
  3. 若上一步屢次嘗試無果則阻塞等待,等待有線程釋放了令牌後再喚醒等待隊列中的線程從新競爭獲取令牌;

四 release-釋放令牌

release方法體:ui

public void release() {
    sync.releaseShared(1);
}
複製代碼

實際調用的是AQS的方法:this

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

先調用在Semaphore中重寫的嘗試釋放令牌方法,而且釋放成功後返回true:spa

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 獲取當前令牌數量
        int current = getState();
        // 計算釋放後的令牌數量
        int next = current + releases;
        // 若是本次釋放的令牌爲負數則拋出異常
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS更新令牌數成功後返回true
        if (compareAndSetState(current, next))
            return true;
    }
}
複製代碼

釋放令牌成功後,喚醒在同步隊列中等待的線程:

private void doReleaseShared() {
    /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
複製代碼

簡單總結下release流程:

  1. 循環嘗試釋放令牌,並cas更新剩餘令牌數;
  2. 令牌釋放成功後喚醒同步等待隊列中的線程;

五 總結

Semaphore利用AQS中的共享鎖來操做共享變量state,並使用state做爲令牌的計數。每一個線程調用required請求獲取令牌,調用release則釋放領牌。當令牌取完時則剩下的線程加入AQS隊列中阻塞等待,當有令牌釋放時會喚醒等待的線程。


更多原創文章請關注微信公衆號
👇👇👇
嘮吧嗑吧

相關文章
相關標籤/搜索