源碼分析Dubbo服務提供者、服務消費者併發度控制機制

微信公衆號:[中間件興趣圈]
做者簡介:《RocketMQ技術內幕》php

本文將詳細分析<dubbo:service executes=""/>與<dubbo:reference actives = ""/>的實現機制,深刻探討Dubbo自身的保護機制。java

源碼分析ExecuteLimitFilter

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )nginx

  • 過濾器做用
    服務調用方併發度控制。web

  • 使用場景
    對Dubbo服務提供者實現的一種保護機制,控制每一個服務的最大併發度。微信

  • 阻斷條件
    當服務調用超過容許的併發度後,直接拋出RpcException異常。
    接下來源碼分析ExecuteLimitFilter的實現細節。併發

ExecuteLimitFilter#invokeapp

 1public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
2        URL url = invoker.getUrl();
3        String methodName = invocation.getMethodName();
4        Semaphore executesLimit = null;
5        boolean acquireResult = false;
6        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);      // @1
7        if (max > 0) {
8            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());             // @2
9            executesLimit = count.getSemaphore(max);                                                              // @3
10            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {              // @4
11                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads 
12                      greater than <dubbo:service executes=\""
 + max + "\" /> limited.");
13            }
14        }
15        boolean isSuccess = true;
16        try {
17            Result result = invoker.invoke(invocation);                 // @5
18            return result;
19        } catch (Throwable t) {
20            isSuccess = false;
21            if (t instanceof RuntimeException) {
22                throw (RuntimeException) t;
23            } else {
24                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
25            }
26        } finally {
27            if(acquireResult) {                                   // @6
28                executesLimit.release();
29            }
30        }
31    }

代碼@1:從服務提供者列表中獲取參數executes的值,若是該值小於等於0,表示不啓用併發度控制,直接沿着調用鏈進行調用。運維

代碼@2:根據服務提供者url和服務調用方法名,獲取RpcStatus。ide

 1public static RpcStatus getStatus(URL url, String methodName) {
2        String uri = url.toIdentityString();      
3        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);         
4        if (map == null) {
5            METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());    
6            map = METHOD_STATISTICS.get(uri);
7        }
8        RpcStatus status = map.get(methodName);          /
9        if (status == null) {
10            map.putIfAbsent(methodName, new RpcStatus());
11            status = map.get(methodName);
12        }
13        return status;
14    }

這裏是併發容器ConcurrentHashMap的經典使用,從這裏能夠看出ConcurrentMap<String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的存儲結構爲 {  服務提供者URL惟一字符串:{方法名:RpcStatus} }。源碼分析

代碼@3:根據服務提供者配置的最大併發度,建立該服務該方法對應的信號量對象。

 1public Semaphore getSemaphore(int maxThreadNum) {
2        if(maxThreadNum <= 0) {
3            return null;
4        }
5        if (executesLimit == null || executesPermits != maxThreadNum) {
6            synchronized (this) {
7                if (executesLimit == null || executesPermits != maxThreadNum) {
8                    executesLimit = new Semaphore(maxThreadNum);
9                    executesPermits = maxThreadNum;
10                }
11            }
12        }
13        return executesLimit;
14    }

使用了雙重檢測來建立executesLimit 信號量。
代碼@4:若是獲取不到鎖,並不會阻塞等待,而是直接拋出RpcException,服務端的策略是快速拋出異常,供服務調用方(消費者)根據集羣策略進行執行,例如重試其餘服務提供者。

代碼@5:執行真實的服務調用。

代碼@6:若是成功申請到信號量,在服務調用結束後,釋放信號量。

總結:<dubbo:service executes=""/>的含義是,針對每一個服務每一個方法的最大併發度。若是超過該值,則直接拋出RpcException。

源碼分析ActiveLimitFilter

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )

  • 過濾器做用
    消費端調用服務的併發控制。

  • 使用場景
    控制同一個消費端對服務端某一服務的併發調用度,一般該值應該小於< dubbo:service executes=""/>

  • 阻斷條件
    非阻斷,但若是超過容許的併發度會阻塞,超過超時時間後將再也不調用服務,而是直接拋出超時。

ActiveLimitFilter#invoke

 1public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
2        URL url = invoker.getUrl();
3        String methodName = invocation.getMethodName();
4        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);    // @1
5        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());           // @2
6        if (max > 0) {                                          
7            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);   // @3
8            long start = System.currentTimeMillis();
9            long remain = timeout;
10            int active = count.getActive();                                                                                                                                          // @4
11            if (active >= max) {                                                                                                                                                          // @5
12                synchronized (count) {                                                                                                                                                                      
13                    while ((active = count.getActive()) >= max) {                                                                                                     
14                        try {
15                            count.wait(remain);                                                                                                                                      
16                        } catch (InterruptedException e) {
17                        }
18                        long elapsed = System.currentTimeMillis() - start;                               
19                        remain = timeout - elapsed;
20                        if (remain <= 0) {                                                                                                                                             // @6
21                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
22                                    + invoker.getInterface().getName() + ", method: "
23                                    + invocation.getMethodName() + ", elapsed: " + elapsed
24                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
25                                    + ". max concurrent invoke limit: " + max);
26                        }
27                    }
28                }
29            }
30        }
31        try {
32            long begin = System.currentTimeMillis();
33            RpcStatus.beginCount(url, methodName);        // @7
34            try {
35                Result result = invoker.invoke(invocation);     // @8
36                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);    // @9
37                return result;
38            } catch (RuntimeException t) {
39                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
40                throw t;
41            }
42        } finally {
43            if (max > 0) {
44                synchronized (count) {
45                    count.notify();     // @10
46                }
47            }
48        }
49    }

代碼@1:從Invoker中獲取消息端URL中的配置的actives參數,爲何從Invoker中獲取的Url是消費端的Url呢?這是由於在消費端根據服務提供者URL建立調用Invoker時,會用服務提供者URL,而後合併消費端的配置屬性,其優先級 -D > 消費端 > 服務端。其代碼位於:
RegistryDirectory#toInvokers

1URL url = mergeUrl(providerUrl);

代碼@2:根據服務提供者URL和調用服務提供者方法,獲取RpcStatus。

代碼@3:獲取接口調用的超時時間,默認爲1s。

代碼@4:獲取當前消費者,針對特定服務,特定方法的併發調用度,active值。

代碼@5:若是當前的併發 調用大於等於容許的最大值,則針對該RpcStatus申請鎖,並調用其wait(timeout)進行等待,也就是在接口調用超時時間內,仍是未被喚醒,則直接拋出超時異常。

代碼@6:判斷被喚醒的緣由是由於等待超時,仍是因爲調用結束,釋放了"名額「,若是是超時喚醒,則直接拋出異常。

代碼@7:在一次服務調用前,先將 服務名+方法名對應的RpcStatus的active加一。

代碼@8:執行RPC服務調用。

代碼@9:記錄成功調用或失敗調用,並將active減一。

代碼@10:最終成功執行,若是開啓了actives機制(dubbo:referecnce actives="")時,喚醒等待者。

總結:<dubbo:reference actives=""/> 是控制消費端對單個服務提供者單個服務容許調用的最大併發度。該值的取值不該該大於<dubbo:service executes=""/>的值,而且若是消費者機器的配置,若是性能不盡相同,不建議對該值進行設置。


廣告:做者的新書《RocketMQ技術內幕》已上市

《RocketMQ技術內幕》已出版上市,目前可在主流購物平臺(京東、天貓等)購買,本書從源碼角度深度分析了RocketMQ NameServer、消息發送、消息存儲、消息消費、消息過濾、主從同步HA、事務消息;在實戰篇重點介紹了RocketMQ運維管理界面與當前支持的39個運維命令;並在附錄部分羅列了RocketMQ幾乎全部的配置參數。本書獲得了RocketMQ創始人、阿里巴巴Messaging開源技術負責人、Linux OpenMessaging 主席的高度承認並做序推薦。目前是國內第一本成體系剖析RocketMQ的書籍。
新書7折優惠!7折優惠!7折優惠!


更多文章請關注微信公衆號:

推薦關注微信公衆號:RocketMQ官方微信公衆號

本文分享自微信公衆號 - 中間件興趣圈(dingwpmz_zjj)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索