微信公衆號:[中間件興趣圈]
做者簡介:《RocketMQ技術內幕》php
本文將詳細分析<dubbo:service executes=""/>與<dubbo:reference actives = ""/>的實現機制,深刻探討Dubbo自身的保護機制。java
源碼分析ExecuteLimitFilter
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )nginx
過濾器做用
服務調用方併發度控制。web使用場景
對Dubbo服務提供者實現的一種保護機制,控制每一個服務的最大併發度。微信阻斷條件
當服務調用超過容許的併發度後,直接拋出RpcException異常。
接下來源碼分析ExecuteLimitFilter的實現細節。併發
ExecuteLimitFilter#invokeapp
1public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
2 URL url = invoker.getUrl();
3 String methodName = invocation.getMethodName();
4 Semaphore executesLimit = null;
5 boolean acquireResult = false;
6 int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); // @1
7 if (max > 0) {
8 RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // @2
9 executesLimit = count.getSemaphore(max); // @3
10 if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { // @4
11 throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads
12 greater than <dubbo:service executes=\"" + max + "\" /> limited.");
13 }
14 }
15 boolean isSuccess = true;
16 try {
17 Result result = invoker.invoke(invocation); // @5
18 return result;
19 } catch (Throwable t) {
20 isSuccess = false;
21 if (t instanceof RuntimeException) {
22 throw (RuntimeException) t;
23 } else {
24 throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
25 }
26 } finally {
27 if(acquireResult) { // @6
28 executesLimit.release();
29 }
30 }
31 }
代碼@1:從服務提供者列表中獲取參數executes的值,若是該值小於等於0,表示不啓用併發度控制,直接沿着調用鏈進行調用。運維
代碼@2:根據服務提供者url和服務調用方法名,獲取RpcStatus。ide
1public static RpcStatus getStatus(URL url, String methodName) {
2 String uri = url.toIdentityString();
3 ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
4 if (map == null) {
5 METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
6 map = METHOD_STATISTICS.get(uri);
7 }
8 RpcStatus status = map.get(methodName); /
9 if (status == null) {
10 map.putIfAbsent(methodName, new RpcStatus());
11 status = map.get(methodName);
12 }
13 return status;
14 }
這裏是併發容器ConcurrentHashMap的經典使用,從這裏能夠看出ConcurrentMap<String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的存儲結構爲 { 服務提供者URL惟一字符串:{方法名:RpcStatus} }。源碼分析
代碼@3:根據服務提供者配置的最大併發度,建立該服務該方法對應的信號量對象。
1public Semaphore getSemaphore(int maxThreadNum) {
2 if(maxThreadNum <= 0) {
3 return null;
4 }
5 if (executesLimit == null || executesPermits != maxThreadNum) {
6 synchronized (this) {
7 if (executesLimit == null || executesPermits != maxThreadNum) {
8 executesLimit = new Semaphore(maxThreadNum);
9 executesPermits = maxThreadNum;
10 }
11 }
12 }
13 return executesLimit;
14 }
使用了雙重檢測來建立executesLimit 信號量。
代碼@4:若是獲取不到鎖,並不會阻塞等待,而是直接拋出RpcException,服務端的策略是快速拋出異常,供服務調用方(消費者)根據集羣策略進行執行,例如重試其餘服務提供者。
代碼@5:執行真實的服務調用。
代碼@6:若是成功申請到信號量,在服務調用結束後,釋放信號量。
總結:<dubbo:service executes=""/>的含義是,針對每一個服務每一個方法的最大併發度。若是超過該值,則直接拋出RpcException。
源碼分析ActiveLimitFilter
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )
過濾器做用
消費端調用服務的併發控制。使用場景
控制同一個消費端對服務端某一服務的併發調用度,一般該值應該小於< dubbo:service executes=""/>阻斷條件
非阻斷,但若是超過容許的併發度會阻塞,超過超時時間後將再也不調用服務,而是直接拋出超時。
ActiveLimitFilter#invoke
1public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
2 URL url = invoker.getUrl();
3 String methodName = invocation.getMethodName();
4 int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); // @1
5 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // @2
6 if (max > 0) {
7 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); // @3
8 long start = System.currentTimeMillis();
9 long remain = timeout;
10 int active = count.getActive(); // @4
11 if (active >= max) { // @5
12 synchronized (count) {
13 while ((active = count.getActive()) >= max) {
14 try {
15 count.wait(remain);
16 } catch (InterruptedException e) {
17 }
18 long elapsed = System.currentTimeMillis() - start;
19 remain = timeout - elapsed;
20 if (remain <= 0) { // @6
21 throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
22 + invoker.getInterface().getName() + ", method: "
23 + invocation.getMethodName() + ", elapsed: " + elapsed
24 + ", timeout: " + timeout + ". concurrent invokes: " + active
25 + ". max concurrent invoke limit: " + max);
26 }
27 }
28 }
29 }
30 }
31 try {
32 long begin = System.currentTimeMillis();
33 RpcStatus.beginCount(url, methodName); // @7
34 try {
35 Result result = invoker.invoke(invocation); // @8
36 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); // @9
37 return result;
38 } catch (RuntimeException t) {
39 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
40 throw t;
41 }
42 } finally {
43 if (max > 0) {
44 synchronized (count) {
45 count.notify(); // @10
46 }
47 }
48 }
49 }
代碼@1:從Invoker中獲取消息端URL中的配置的actives參數,爲何從Invoker中獲取的Url是消費端的Url呢?這是由於在消費端根據服務提供者URL建立調用Invoker時,會用服務提供者URL,而後合併消費端的配置屬性,其優先級 -D > 消費端 > 服務端。其代碼位於:
RegistryDirectory#toInvokers
1URL url = mergeUrl(providerUrl);
代碼@2:根據服務提供者URL和調用服務提供者方法,獲取RpcStatus。
代碼@3:獲取接口調用的超時時間,默認爲1s。
代碼@4:獲取當前消費者,針對特定服務,特定方法的併發調用度,active值。
代碼@5:若是當前的併發 調用大於等於容許的最大值,則針對該RpcStatus申請鎖,並調用其wait(timeout)進行等待,也就是在接口調用超時時間內,仍是未被喚醒,則直接拋出超時異常。
代碼@6:判斷被喚醒的緣由是由於等待超時,仍是因爲調用結束,釋放了"名額「,若是是超時喚醒,則直接拋出異常。
代碼@7:在一次服務調用前,先將 服務名+方法名對應的RpcStatus的active加一。
代碼@8:執行RPC服務調用。
代碼@9:記錄成功調用或失敗調用,並將active減一。
代碼@10:最終成功執行,若是開啓了actives機制(dubbo:referecnce actives="")時,喚醒等待者。
總結:<dubbo:reference actives=""/> 是控制消費端對單個服務提供者單個服務容許調用的最大併發度。該值的取值不該該大於<dubbo:service executes=""/>的值,而且若是消費者機器的配置,若是性能不盡相同,不建議對該值進行設置。
廣告:做者的新書《RocketMQ技術內幕》已上市
《RocketMQ技術內幕》已出版上市,目前可在主流購物平臺(京東、天貓等)購買,本書從源碼角度深度分析了RocketMQ NameServer、消息發送、消息存儲、消息消費、消息過濾、主從同步HA、事務消息;在實戰篇重點介紹了RocketMQ運維管理界面與當前支持的39個運維命令;並在附錄部分羅列了RocketMQ幾乎全部的配置參數。本書獲得了RocketMQ創始人、阿里巴巴Messaging開源技術負責人、Linux OpenMessaging 主席的高度承認並做序推薦。目前是國內第一本成體系剖析RocketMQ的書籍。
新書7折優惠!7折優惠!7折優惠!
更多文章請關注微信公衆號:
推薦關注微信公衆號:RocketMQ官方微信公衆號
本文分享自微信公衆號 - 中間件興趣圈(dingwpmz_zjj)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。