最近能夠進行個稅申報了,尚未申報的同窗能夠趕忙去試試哦。不過我反正是從上午到下午一直都沒有成功的進行申報,一進行申報
就返回「當前訪問人數過多,請稍後再試」。爲何有些人就可以申報成功,有些人就直接返回失敗。這很明顯申報處理資源是有限的,
只能等別人處理完了在來處理你的,你若是運氣好可能重試幾回就輪到你了,若是運氣很差可能重試一天也可能輪不到你。
我反正已是放棄了,等到夜深人靜的時候再來試試。做爲一個程序員咱們確定知道這是個稅申請app的限流操做,若是還有不懂什麼
是限流操做的能夠參考下這個文章《高併發系統三大利器之限流》。
好比個稅申報系統每臺機器只最多分別只能處理1000
個請求,再多的請求就會把機器打掛。若是是多餘的請求就把這些請求拒絕掉。直接給你返回一句舒適提示:「當前訪問人數過多,請稍後再試」,若是要實現這個功能你們想一想能夠經過哪些方法算法來實現。java
學習semaphore
以前咱們必需要先了解下什麼是共享鎖。在上一篇文章《Java高併發編程基礎之AQS》咱們介紹了公平鎖於非公平鎖的區別。node
獨佔鎖:也有人把它叫作「獨享鎖」,它是是獨佔的,排他的,只能被一個線程可持有,
當獨佔鎖已經被某個線程持有時,其餘線程只能等待它被釋放後,才能去爭鎖,而且同一時刻只有一個線程能爭鎖成功。git
在《Java併發編程藝術》(微信搜【java金融】回覆電子書能夠免費獲取PDF版本)這一書中是這麼說的:程序員
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。不少年以來,我都以爲從字面上很難理解Semaphore所表達的含義,只能把它比做是控制流量的紅綠燈,好比XX馬路要限制流量,只容許同時有一百輛車在這條路上行使,其餘的都必須在路口等待,因此前一百輛車會看到綠燈,能夠開進這條馬路,後面的車會看到紅燈,不能駛入XX馬路,可是若是前一百輛中有五輛車已經離開了XX馬路,那麼後面就容許有5輛車駛入馬路,這個例子裏說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。github
Semaphore
機制是提供給線程搶佔式獲取許可,因此他能夠實現公平或者非公平,相似於ReentrantLock
。
說了這麼多咱們來個實際的例子看一看,好比咱們去停車場停車,停車場總共只有5
個車位,可是如今有8
輛汽車來停車,剩下的3
輛汽車要麼等其餘汽車開走後進行停車,或者去找別的停車位?面試
/** * @author: 公衆號【Java金融】 */ public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { // 初始化五個車位 Semaphore semaphore = new Semaphore(5); // 等全部車子 final CountDownLatch latch = new CountDownLatch(8); for (int i = 0; i < 8; i++) { int finalI = i; if (i == 5) { Thread.sleep(1000); new Thread(() -> { stopCarNotWait(semaphore, finalI); latch.countDown(); }).start(); continue; } new Thread(() -> { stopCarWait(semaphore, finalI); latch.countDown(); }).start(); } latch.await(); log("總共還剩:" + semaphore.availablePermits() + "個車位"); } private static void stopCarWait(Semaphore semaphore, int finalI) { String format = String.format("車牌號%d", finalI); try { semaphore.acquire(1); log(format + "找到車位了,去停車了"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(1); log(format + "開走了"); } } private static void stopCarNotWait(Semaphore semaphore, int finalI) { String format = String.format("車牌號%d", finalI); try { if (semaphore.tryAcquire()) { log(format + "找到車位了,去停車了"); Thread.sleep(10000); log(format + "開走了"); semaphore.release(); } else { log(format + "沒有停車位了,不在這裏等了去其餘地方停車去了"); } } catch (Exception e) { e.printStackTrace(); } } public static void log(String content) { // 格式化 DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 當前時間 LocalDateTime now = LocalDateTime.now(); System.out.println(now.format(fmTime) + " "+content); } }
2021-03-01 18:54:57 車牌號0找到車位了,去停車了 2021-03-01 18:54:57 車牌號3找到車位了,去停車了 2021-03-01 18:54:57 車牌號2找到車位了,去停車了 2021-03-01 18:54:57 車牌號1找到車位了,去停車了 2021-03-01 18:54:57 車牌號4找到車位了,去停車了 2021-03-01 18:54:58 車牌號5沒有停車位了,不在這裏等了去其餘地方停車去了 2021-03-01 18:55:07 車牌號7找到車位了,去停車了 2021-03-01 18:55:07 車牌號6找到車位了,去停車了 2021-03-01 18:55:07 車牌號2開走了 2021-03-01 18:55:07 車牌號0開走了 2021-03-01 18:55:07 車牌號3開走了 2021-03-01 18:55:07 車牌號4開走了 2021-03-01 18:55:07 車牌號1開走了 2021-03-01 18:55:17 車牌號7開走了 2021-03-01 18:55:17 車牌號6開走了 2021-03-01 18:55:17 總共還剩:5個車位
從輸出結果咱們能夠看到車牌號5
這輛車看見沒有車位了,就不在這個地方傻傻的等了,而是去其餘地方了,可是車牌號6
和車牌號7
分別須要等到車庫開出兩輛車空出兩個車位後才停進去。這就體現了Semaphore
的acquire
方法若是沒有獲取到憑證它就會阻塞,而tryAcquire
方法若是沒有獲取到憑證不會阻塞的。算法
在Dubbo
中能夠給Provider
配置線程池大小來控制系統提供服務的最大並行度,默認是200
。apache
<dubbo:provider threads="200"/>
好比我如今這個訂單系統有三個接口,分別爲創單、取消訂單、修改訂單。這三個接口加起來的併發是200可是創單接口是核心接口,我想讓它多分點線程來執行
讓它能夠有最大150
個線程,取消訂單和修改訂單分別最大25
個線程執行就能夠了。dubbo
提供了executes
這一屬性來實現這個功能編程
<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/> <dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/> <dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>
咱們能夠看看dubbo
內部是如何來executes
的,具體實現是在ExecuteLimitFilter
這個類咱們能夠segmentfault
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // 若是當前使用的線程數量已經大於等於設置的閾值,那麼直接拋出異常 // if (count.getActive() >= max) { // throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */ executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; // 計數器+1 RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { // 計數器-1 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } }
從上述代碼咱們也能夠看出早期這個是沒有采用Semaphore
來實現的,而是直接採用被註釋的 if (count.getActive() >= max)
這個來來實現的,因爲這個count.getActive() >= max 和這個計數加1不是原子性的,因此會有問題,具體bug號能夠看https://github.com/apache/dubbo/pull/582後面才採用上述代碼用Semaphore
來修復非原子性問題。具體更詳細的分析能夠參見代碼的連接。不過如今最新版本(2.7.9)我看是採用採用自旋加上和CAS
來實現的。
上面就是對Semaphore
一個簡單的使用以及dubbo
中用到的例子,說句實話Semaphore在工做中用的仍是比較少的,不過面試又有可能會被問到,因此仍是有必要來一塊兒學習一下它。咱們前面《Java高併發編程基礎之AQS》經過ReentrantLock 一塊兒學習了下AQS,其實Semaphore一樣也是經過AQS來是實現的,咱們能夠一塊兒來對照下獨佔鎖的方法,基本上都是有方法一一相對應的。
這裏有兩點稍微須要注意的地方:
在共享鎖模式下,當一個節點獲取到了共享鎖,咱們在獲取成功後就能夠喚醒後繼節點了,而不須要等到該節點釋放鎖的時候,這是由於共享鎖能夠被多個線程同時持有,一個鎖獲取到了,則後繼的節點均可以直接來獲取。所以,在共享鎖模式下,在獲取鎖和釋放鎖結束時,都會喚醒後繼節點。
咱們一樣仍是經過非公平鎖的模式來老獲取憑證
咱們能夠看下acquire的核心方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 主要看下這個方法,這個方法返回的值也就是tryAcquireShared返回的值,由於tryAcquireShared->nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { //自旋 for (;;) { //Semaphore用AQS的state變量的值表明可用許可數 int available = getState(); //可用許可數減去本次須要獲取的許可數即爲剩餘許可數 int remaining = available - acquires; //若是剩餘許可數小於0或者CAS將當前可用許可數設置爲剩餘許可數成功,則返回成功許可數 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }
tryAcquireShared
獲取返回許可書小於0時說明獲取許可失敗須要進入doAcquireSharedInterruptibly
這個方法去休眠。tryAcquireShared
獲取返回許可書小於0時說明獲取許可成功直接結束。 ```java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 獨佔鎖的acquireQueued調用的是addWaiter(Node.EXCLUSIVE), // 而共享鎖調用的是addWaiter(Node.SHARED),代表了該節點處於共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這個方法是否是跟咱們上篇文章講的AQS
的獨佔鎖的acquireQueued
很像,不過獨佔鎖它是直接調用了用了setHead(node)
方法,而共享鎖調用的是setHeadAndPropagate(node, r)
這個方法除了調用setHead
裏面還調用了doReleaseShared
(喚醒後繼節點)
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
其餘的方法基本上是和ReentrantLock
來實現的獨佔鎖差很少,我相信你們對源碼分析感興趣的應該也很少,其餘更多細節問題仍是須要本身親自動手去看源碼的。
Semaphore
初始化設置許可證爲1 時,它也能夠看成互斥鎖使用。其中0、1就至關於它的狀態,當=1時表示其餘線程能夠獲取,當=0時,排他,即其餘線程必需要等待。Semaphore
是JUC
包中的一個很簡單的工具類,用來實現多線程下對於資源的同一時刻的訪問線程數限制Semaphore
中存在一個【許可】的概念,即訪問資源以前,先要得到許可,若是當前許可數量爲0
,那麼線程阻塞,直到得到許可Semaphore
內部使用AQS
實現,由抽象內部類Sync
繼承了AQS
。由於Semaphore
天生就是共享的場景,因此其內部實際上相似於共享鎖的實現semaphore
來進行限流的話會產生突刺現象。
指在必定時間內的一小段時間內就用完了全部資源,後大部分時間中無資源可用。
好比在限流方法中的計算器算法,設置1s內的最大請求數爲100,在前100ms已經永遠了100個請求,則後面900ms將沒法處理請求,這就是突刺現象結束