Semaphore 源碼分析

須要提早了解的知識點: AbstractQueuedSynchronizer 實現原理併發

類介紹

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。好比控制用戶的訪問量,同一時刻只容許1000個用戶同時使用系統,若是超過1000個併發,則須要等待。dom

使用場景

好比模擬一個停車場停車信號,假設停車場只有兩個車位,一開始兩個車位都是空的。這時若是同時來了兩輛車,看門人容許它們進入停車場,而後放下車攔。之後來的車必須在入口等待,直到停車場中有車輛離開。這時,若是有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開一輛,則又能夠放入一輛,如此往復。ide

public class SemaphoreDemo {
    private static Semaphore s = new Semaphore(2);
    public static void main(String[] args) {
        ExecutorService pool = Executors.newCachedThreadPool();
        pool.submit(new ParkTask("1"));
        pool.submit(new ParkTask("2"));
        pool.submit(new ParkTask("3"));
        pool.submit(new ParkTask("4"));
        pool.submit(new ParkTask("5"));
        pool.submit(new ParkTask("6"));
        pool.shutdown();
    }

    static class ParkTask implements Runnable {
        private String name;
        public ParkTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                s.acquire();
                System.out.println("Thread "+this.name+" start...");
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }
    }
}

Semaphore 源碼分析

Semaphore 經過使用內部類Sync繼承AQS來實現。
支持公平鎖和非公平鎖。內部使用的AQS的共享鎖。
具體實現可參考 AbstractQueuedSynchronizer 源碼分析源碼分析

Semaphore 的結構以下:ui

Semaphore構造

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

構造方法指定信號量的許可數量,默認採用的是非公平鎖,也只能夠指定爲公平鎖。
permits賦值給AQS中的state變量。this

acquire:可響應中斷的得到信號量

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

得到信號量方法,這兩個方法支持 Interrupt中斷機制,可以使用acquire() 方法每次獲取一個信號量,也可使用acquire(int permits) 方法獲取指定數量的信號量 。spa

acquire:不可響應中斷的獲取信號量

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

這兩個方法不響應Interrupt中斷機制,其它功能同acquire方法機制。.net

tryAcquire 方法,嘗試得到信號量

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

嘗試得到信號量有三個方法。線程

  1. 嘗試獲取信號量,若是獲取成功則返回true,不然立刻返回false,不會阻塞當前線程。
  2. 嘗試獲取信號量,若是在指定的時間內得到信號量,則返回true,不然返回false
  3. 嘗試獲取指定數量的信號量,若是在指定的時間內得到信號量,則返回true,不然返回false。

release 釋放信號量

public void release() {
    sync.releaseShared(1);
}

調用AQS中的releaseShared方法,使得state每次減一來控制信號量。code

availablePermits方法,獲取當前剩餘的信號量數量

public int availablePermits() {
    return sync.getPermits();
}

//=========Sync類========
final int getPermits() {
    return getState();
 }

該方法返回AQS中state變量的值,當前剩餘的信號量個數

drainPermits方法

public int drainPermits() {
    return sync.drainPermits();
}

//=========Sync類========
final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

獲取並返回當即可用的全部許可。Sync類的drainPermits方法,獲取1個信號量後將可用的信號量個數置爲0。例如總共有10個信號量,已經使用了5個,再調用drainPermits方法後,能夠得到一個信號量,剩餘4個信號量就消失了,總共可用的信號量就變成6個了。

reducePermits 方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

//=========Sync類========
final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

該方法是protected 方法,減小信號量個數

判斷AQS等待隊列中是否還有Node

public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
}

//=========AbstractQueuedSynchronizer類========
public final boolean hasQueuedThreads() {
   //頭結點不等於尾節點就說明鏈表中還有元素
   return head != tail;
}

getQueuedThreads方法

protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
}

//=========AbstractQueuedSynchronizer類========
public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
        Thread t = p.thread;
        if (t != null)
            list.add(t);
    }
    return list;
}

該方法獲取AQS中等待隊列中全部未獲取信號量的線程相關的信息(等待獲取信號量的線程相關信息)。

相關文章
相關標籤/搜索