java同步系列之Semaphore源碼解析

java同步系列之Semaphore源碼解析

問題

(1)Semaphore是什麼?java

(2)Semaphore具備哪些特性?分佈式

(3)Semaphore一般使用在什麼場景中?ide

(4)Semaphore的許可次數是否能夠動態增減?源碼分析

(5)Semaphore如何實現限流?學習

簡介

Semaphore,信號量,它保存了一系列的許可(permits),每次調用acquire()都將消耗一個許可,每次調用release()都將歸還一個許可。ui

特性

Semaphore一般用於限制同一時間對共享資源的訪問次數上,也就是常說的限流。線程

下面咱們一塊兒來學習Java中Semaphore是如何實現的。blog

類結構


10e195175cff56584db5362c7dc7a35f.jpeg


Semaphore中包含了一個實現了AQS的同步器Sync,以及它的兩個子類FairSync和NonFairSync,這說明Semaphore也是區分公平模式和非公平模式的。隊列

源碼分析

基於以前對於ReentrantLock和ReentrantReadWriteLock的分析,這篇文章相對來講比較簡單,以前講過的一些方法將直接略過,有興趣的能夠拉到文章底部查看以前的文章。資源

內部類Sync

// java.util.concurrent.Semaphore.Syncabstract static class Sync extends AbstractQueuedSynchronizer {
 private static final long serialVersionUID = 1192457210091910933L;
 // 構造方法,傳入許可次數,放入state中
 Sync(int permits) {
        setState(permits);
 }
 // 獲取許可次數
 final int getPermits() {
 return getState();
 }
 // 非公平模式嘗試獲取許可
 final int nonfairTryAcquireShared(int acquires) {
 for (;;) {
 // 看看還有幾個許可
 int available = getState();
 // 減去此次須要獲取的許可還剩下幾個許可
 int remaining = available - acquires;
 // 若是剩餘許可小於0了則直接返回
 // 若是剩餘許可不小於0,則嘗試原子更新state的值,成功了返回剩餘許可
 if (remaining < 0 ||
                compareAndSetState(available, remaining))
 return remaining;
 }
 }
 // 釋放許可
 protected final boolean tryReleaseShared(int releases) {
 for (;;) {
 // 看看還有幾個許可
 int current = getState();
 // 加上此次釋放的許可
 int next = current + releases;
 // 檢測溢出
 if (next < current) // overflow
 throw new Error("Maximum permit count exceeded");
 // 若是原子更新state的值成功,就說明釋放許可成功,則返回true
 if (compareAndSetState(current, next))
 return true;
 }
 }
 // 減小許可
 final void reducePermits(int reductions) {
 for (;;) {
 // 看看還有幾個許可
 int current = getState();
 // 減去將要減小的許可
 int next = current - reductions;
 // 檢測舉出
 if (next > current) // underflow
 throw new Error("Permit count underflow");
 // 原子更新state的值,成功了返回true
 if (compareAndSetState(current, next))
 return;
 }
 }
 // 銷燬許可
 final int drainPermits() {
 for (;;) {
 // 看看還有幾個許可
 int current = getState();
 // 若是爲0,直接返回
 // 若是不爲0,把state原子更新爲0
 if (current == 0 || compareAndSetState(current, 0))
 return current;
 }
 }}


經過Sync的幾個實現方法,咱們獲取到如下幾點信息:

(1)許但是在構造方法時傳入的;

(2)許可存放在狀態變量state中;

(3)嘗試獲取一個許可的時候,則state的值減1;

(4)當state的值爲0的時候,則沒法再獲取許可;

(5)釋放一個許可的時候,則state的值加1;

(6)許可的個數能夠動態改變;

內部類NonfairSync

// java.util.concurrent.Semaphore.NonfairSyncstatic final class NonfairSync extends Sync {
 private static final long serialVersionUID = -2694183684443567898L;
 // 構造方法,調用父類的構造方法
 NonfairSync(int permits) {
 super(permits);
 }
 // 嘗試獲取許可,調用父類的nonfairTryAcquireShared()方法
 protected int tryAcquireShared(int acquires) {
 return nonfairTryAcquireShared(acquires);
 } }


非公平模式下,直接調用父類的nonfairTryAcquireShared()嘗試獲取許可。

內部類FairSync

// java.util.concurrent.Semaphore.FairSyncstatic final class FairSync extends Sync {
 private static final long serialVersionUID = 2014338818796000944L;
 // 構造方法,調用父類的構造方法
 FairSync(int permits) {
 super(permits);
 }
 // 嘗試獲取許可
 protected int tryAcquireShared(int acquires) {
 for (;;) {
 // 公平模式須要檢測是否前面有排隊的
 // 若是有排隊的直接返回失敗
 if (hasQueuedPredecessors())
 return -1;
 // 沒有排隊的再嘗試更新state的值
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0 ||
                compareAndSetState(available, remaining))
 return remaining;
 }
 }}


公平模式下,先檢測前面是否有排隊的,若是有排隊的則獲取許可失敗,進入隊列排隊,不然嘗試原子更新state的值。

構造方法

// 構造方法,建立時要傳入許可次數,默認使用非公平模式public Semaphore(int permits) {
    sync = new NonfairSync(permits);}// 構造方法,須要傳入許可次數,及是否公平模式public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);}


建立Semaphore時須要傳入許可次數。

Semaphore默認也是非公平模式,可是你能夠調用第二個構造方法聲明其爲公平模式。

下面的方法在學習過前面的內容看來都比較簡單,彤哥這裏只列舉Semaphore支持的一些功能了。

如下的方法都是針對非公平模式來描述。

acquire()方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);}


獲取一個許可,默認使用的是可中斷方式,若是嘗試獲取許可失敗,會進入AQS的隊列中排隊。

acquireUninterruptibly()方法

public void acquireUninterruptibly() {
    sync.acquireShared(1);}


獲取一個許可,非中斷方式,若是嘗試獲取許可失敗,會進入AQS的隊列中排隊。

tryAcquire()方法

public boolean tryAcquire() {
 return sync.nonfairTryAcquireShared(1) >= 0;}


嘗試獲取一個許可,使用Sync的非公平模式嘗試獲取許可方法,不管是否獲取到許可都返回,只嘗試一次,不會進入隊列排隊。

tryAcquire(long timeout, TimeUnit unit)方法

public boolean tryAcquire(long timeout, TimeUnit unit)
 throws InterruptedException {
 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}


嘗試獲取一個許可,先嚐試一次獲取許可,若是失敗則會等待timeout時間,這段時間內都沒有獲取到許可,則返回false,不然返回true;

release()方法

public void release() {
    sync.releaseShared(1);}


釋放一個許可,釋放一個許可時state的值會加1,而且會喚醒下一個等待獲取許可的線程。

acquire(int permits)方法

public void acquire(int permits) throws InterruptedException {
 if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);}


一次獲取多個許可,可中斷方式。

acquireUninterruptibly(int permits)方法

public void acquireUninterruptibly(int permits) {
 if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);}


一次獲取多個許可,非中斷方式。

tryAcquire(int permits)方法

public boolean tryAcquire(int permits) {
 if (permits < 0) throw new IllegalArgumentException();
 return sync.nonfairTryAcquireShared(permits) >= 0;}


一次嘗試獲取多個許可,只嘗試一次。

tryAcquire(int permits, long timeout, TimeUnit unit)方法

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
 throws InterruptedException {
 if (permits < 0) throw new IllegalArgumentException();
 return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}


嘗試獲取多個許可,並會等待timeout時間,這段時間沒獲取到許可則返回false,不然返回true。

release(int permits)方法

public void release(int permits) {
 if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);}


一次釋放多個許可,state的值會相應增長permits的數量。

availablePermits()方法

public int availablePermits() {
 return sync.getPermits();}


獲取可用的許可次數。

drainPermits()方法

public int drainPermits() {
 return sync.drainPermits();}


銷燬當前可用的許可次數,對於已經獲取的許可沒有影響,會把當前剩餘的許可所有銷燬。

reducePermits(int reduction)方法

protected void reducePermits(int reduction) {
 if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);}


減小許可的次數。

總結

(1)Semaphore,也叫信號量,一般用於控制同一時刻對共享資源的訪問上,也就是限流場景;

(2)Semaphore的內部實現是基於AQS的共享鎖來實現的;

(3)Semaphore初始化的時候須要指定許可的次數,許可的次數是存儲在state中;

(4)獲取一個許可時,則state值減1;

(5)釋放一個許可時,則state值加1;

(6)能夠動態減小n個許可;

(7)能夠動態增長n個許可嗎?

彩蛋

(1)如何動態增長n個許可?

答:調用release(int permits)便可。咱們知道釋放許可的時候state的值會相應增長,再回頭看看釋放許可的源碼,發現與ReentrantLock的釋放鎖仍是有點區別的,Semaphore釋放許可的時候並不會檢查當前線程有沒有獲取過許可,因此能夠調用釋放許可的方法動態增長一些許可。

(2)如何實現限流?

答:限流,即在流量忽然增大的時候,上層要可以限制住忽然的大流量對下游服務的衝擊,在分佈式系統中限流通常作在網關層,固然在個別功能中也能夠本身簡單地來限流,好比秒殺場景,假如只有10個商品須要秒殺,那麼,服務自己能夠限制同時只進來100個請求,其它請求所有做廢,這樣服務的壓力也不會太大。

使用Semaphore就能夠直接針對這個功能來限流,如下是代碼實現:

public class SemaphoreTest {
 public static final Semaphore SEMAPHORE = new Semaphore(100);
 public static final AtomicInteger failCount = new AtomicInteger(0);
 public static final AtomicInteger successCount = new AtomicInteger(0);

 public static void main(String[] args) {
 for (int i = 0; i < 1000; i++) {
 new Thread(()->seckill()).start();
 }
 }

 public static boolean seckill() {
 if (!SEMAPHORE.tryAcquire()) {
 System.out.println("no permits, count="+failCount.incrementAndGet());
 return false;
 }

 try {
 // 處理業務邏輯
 Thread.sleep(2000);
 System.out.println("seckill success, count="+successCount.incrementAndGet());
 } catch (InterruptedException e) {
 // todo 處理異常
            e.printStackTrace();
 } finally {
            SEMAPHORE.release();
 }
 return true;
 }}
相關文章
相關標籤/搜索