j.u.c系列(10)---之併發工具類:Semaphore

寫在前面node

   Semaphore是一個計數信號量,它的本質是一個"共享鎖"。數據結構

  信號量維護了一個信號量許可集。線程能夠經過調用acquire()來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;不然線程必須等待,直到有可用的許可爲止。 線程能夠經過release()來釋放它所持有的信號量許可。函數

// 建立具備給定的許可數和非公平的公平設置的 Semaphore。
Semaphore(int permits)
// 建立具備給定的許可數和給定的公平設置的 Semaphore。
Semaphore(int permits, boolean fair)

// 今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被中斷。
void acquire()
// 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。
void acquire(int permits)
// 今後信號量中獲取許可,在有可用的許可前將其阻塞。
void acquireUninterruptibly()
// 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信號量中當前可用的許可數。
int availablePermits()
// 獲取並返回當即可用的全部許可。
int drainPermits()
// 返回一個 collection,包含可能等待獲取的線程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待獲取的線程的估計數目。
int getQueueLength()
// 查詢是否有線程正在等待獲取。
boolean hasQueuedThreads()
// 若是此信號量的公平設置爲 true,則返回 true。
boolean isFair()
// 根據指定的縮減量減少可用許可的數目。
protected void reducePermits(int reduction)
// 釋放一個許可,將其返回給信號量。
void release()
// 釋放給定數目的許可,將其返回到信號量。
void release(int permits)
// 返回標識此信號量的字符串,以及信號量的狀態。
String toString()
// 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。
boolean tryAcquire()
// 僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。
boolean tryAcquire(int permits)
// 若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被中斷,則今後信號量獲取給定數目的許可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 若是在給定的等待時間內,此信號量有可用的許可而且當前線程未被中斷,則今後信號量獲取一個許可。
boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore數據結構

 

  從圖中能夠看出:
  (01) 和"ReentrantLock"同樣,Semaphore也包含了sync對象,sync是Sync類型;並且,Sync是一個繼承於AQS的抽象類。
  (02) Sync包括兩個子類:"公平信號量"FairSync 和 "非公平信號量"NonfairSync。sync是"FairSync的實例",或者"NonfairSync的實例";默認狀況下,sync是NonfairSync(即,默認是非公平信號量)。oop

Semaphore源碼分析

  Semaphore是經過共享鎖實現的。根據共享鎖的獲取原則,Semaphore分爲"公平信號量"和"非公平信號量"。源碼分析

  "公平信號量"和"非公平信號量"的區別ui

  "公平信號量"和"非公平信號量"的釋放信號量的機制是同樣的!不一樣的是它們獲取信號量的機制:線程在嘗試獲取信號量許可時,對於公平信號量而言,若是當前線程不在CLH隊列的頭部,則排隊等候;而對於非公平信號量而言,不管當前線程是否是在CLH隊列的頭部,它都會直接獲取信號量。該差別具體的體如今,它們的tryAcquireShared()函數的實現不一樣。this

  "公平信號量"類spa

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

 

"非公平信號量"類線程

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

 

 

1. 信號量構造函數code

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

 

2. 1公平信號量獲取和釋放

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

信號量中的acquire()獲取函數,其實是調用的AQS中的acquireSharedInterruptibly()。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 若是線程是中斷狀態,則拋出異常。
    if (Thread.interrupted())
        throw new InterruptedException();
    // 不然,嘗試獲取「共享鎖」;獲取成功則直接返回,獲取失敗,則經過doAcquireSharedInterruptibly()獲取。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Semaphore中」公平鎖「對應的tryAcquireShared()實現以下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判斷「當前線程」是否是CLH隊列中的第一個線程線程,
        // 如果的話,則返回-1。
        if (hasQueuedPredecessors())
            return -1;
        // 設置「能夠得到的信號量的許可數」
        int available = getState();
        // 設置「得到acquires個信號量許可以後,剩餘的信號量許可數」
        int remaining = available - acquires;
        // 若是「剩餘的信號量許可數>=0」,則設置「能夠得到的信號量許可數」爲remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

說明:tryAcquireShared()的做用是嘗試獲取acquires個信號量許可數。對於Semaphore而言,state表示的是「當前可得到的信號量許可數」。

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 建立」當前線程「的Node節點,且Node中記錄的鎖是」共享鎖「類型;並將該節點添加到CLH隊列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取上一個節點。
            // 若是上一節點是CLH隊列的表頭,則」嘗試獲取共享鎖「。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 當前線程一直等待,直到獲取到共享鎖。
            // 若是線程在等待過程當中被中斷過,則再次中斷該線程(還原以前的中斷狀態)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

說明:tryAcquireShared()的做用是嘗試獲取ac說明:doAcquireSharedInterruptibly()會使當前線程一直等待,直到當前線程獲取到共享鎖(或被中斷)才返回。
(01) addWaiter(Node.SHARED)的做用是,建立」當前線程「的Node節點,且Node中記錄的鎖的類型是」共享鎖「(Node.SHARED);並將該節點添加到CLH隊列末尾。
(02) node.predecessor()的做用是,獲取上一個節點。若是上一節點是CLH隊列的表頭,則」嘗試獲取共享鎖「。
(03) shouldParkAfterFailedAcquire()的做用和它的名稱同樣,若是在嘗試獲取鎖失敗以後,線程應該等待,則返回true;不然,返回false。
(04) 當shouldParkAfterFailedAcquire()返回ture時,則調用parkAndCheckInterrupt(),當前線程會進入等待狀態,直到獲取到共享鎖才繼續運行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire()。quires個信號量許可數。對於Semaphore而言,state表示的是「當前可得到的信號量許可數」。

2.2 公平信號量的釋放

public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

信號量的releases()釋放函數,其實是調用的AQS中的releaseShared()。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

說明:releaseShared()的目的是讓當前線程釋放它所持有的共享鎖。它首先會經過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則經過doReleaseShared()去釋放共享鎖。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 獲取「能夠得到的信號量的許可數」
        int current = getState();
        // 獲取「釋放releases個信號量許可以後,剩餘的信號量許可數」
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 設置「能夠得到的信號量的許可數」爲next。
        if (compareAndSetState(current, next))
            return true;
    }
}

若是tryReleaseShared()嘗試釋放共享鎖失敗,則會調用doReleaseShared()去釋放共享鎖。doReleaseShared()的源碼以下:

private void doReleaseShared() {
    for (;;) {
        // 獲取CLH隊列的頭節點
        Node h = head;
        // 若是頭節點不爲null,而且頭節點不等於tail節點。
        if (h != null && h != tail) {
            // 獲取頭節點對應的線程的狀態
            int ws = h.waitStatus;
            // 若是頭節點對應的線程是SIGNAL狀態,則意味着「頭節點的下一個節點所對應的線程」須要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 設置「頭節點對應的線程狀態」爲空狀態。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒「頭節點的下一個節點所對應的線程」。
                unparkSuccessor(h);
            }
            // 若是頭節點對應的線程是空狀態,則設置「文件點對應的線程所擁有的共享鎖」爲其它線程獲取鎖的空狀態。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若是頭節點發生變化,則繼續循環。不然,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}

說明:doReleaseShared()會釋放「共享鎖」。它會從前日後的遍歷CLH隊列,依次「喚醒」而後「執行」隊列中每一個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的信號量。

3 非公平信號量獲取

Semaphore中的非公平信號量是NonFairSync。在Semaphore中,「非公平信號量許可的釋放(release)」與「公平信號量許可的釋放(release)」是同樣的。
不一樣的是它們獲取「信號量許可」的機制不一樣,下面是非公平信號量獲取信號量許可的代碼。

非公平信號量的tryAcquireShared()實現以下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 設置「能夠得到的信號量的許可數」
        int available = getState();
        // 設置「得到acquires個信號量許可以後,剩餘的信號量許可數」
        int remaining = available - acquires;
        // 若是「剩餘的信號量許可數>=0」,則設置「能夠得到的信號量許可數」爲remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

 

說明:非公平信號量的tryAcquireShared()調用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循環中,它都會直接判斷「當前剩餘的信號量許可數」是否足夠;足夠的話,則直接「設置能夠得到的信號量許可數」,進而再獲取信號量。而公平信號量的tryAcquireShared()中,在獲取信號量以前會經過if (hasQueuedPredecessors())來判斷「當前線程是否是在CLH隊列的頭部」,是的話,則返回-1。 

 

Semaphore示例

public class SemaphoreTest1 { 
    private static final int SEM_MAX = 10;
    public static void main(String[] args) { 
        Semaphore sem = new Semaphore(SEM_MAX);
        new MyThread(sem, 5).start();
        new MyThread(sem, 4).start();
        new MyThread(sem, 7).start();
    }
}
class MyThread extends Thread {
    private volatile Semaphore sem;    // 信號量
    private int count;        // 申請信號量的大小 

    MyThread(Semaphore sem, int count) {
        this.sem = sem;
        this.count = count;
    }
    public void run() {
        try {
            // 從信號量中獲取count個許可
            sem.acquire(count);

            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " acquire count="+count);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 釋放給定數目的許可,將其返回到信號量。
            sem.release(count);
            System.out.println(Thread.currentThread().getName() + " release " + count + "");
        }
    }
}

 

Thread-1 acquire count=4
Thread-0 acquire count=5
Thread-1 release 4
Thread-0 release 5
Thread-2 acquire count=7
Thread-2 release 7

結果說明:信號量sem的許可總數是10個;共3個線程,分別須要獲取的信號量許可數是5,4,7。前面兩個線程獲取到信號量的許可後,sem中剩餘的可用的許可數是1;所以,最後一個線程必須等前兩個線程釋放了它們所持有的信號量許可以後,才能獲取到7個信號量許可。

相關文章
相關標籤/搜索