Dubbo源碼之客戶端併發控制——ActiveLimitFilter

上篇解釋了Dubbo源碼中降級及容錯處理
Dubbo服務調用——Cluster組件(服務降級,容錯)java

這篇文章主要是關於Dubbo源碼中的限流組件,Dubbo限流除了限流(併發限制)的入口ThreadPool 以外,還有更細粒度的限流功能。首先先記錄客戶端限流組價ActiveLimitFilter 的限流原理。併發

經過它的名字,咱們知道它是Dubbo中的Filter過濾器,接下來它的Activate註解信息以下:ide

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)

所以它是用於消費方的Filter組件的擴展 , 源碼以下:url

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        //獲取設置的acvites的值,默認爲0
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        // 獲取當前方法目前併發請求數量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) { //說明設置了actives變量
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            if (active >= max) {
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        // 等待超時則拋異常
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        // 調用業務
        try {
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                // 喚醒等待線程
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }
}

經過源碼能夠看出它的調用信息實際存儲在 RpcStatus 中,爲何不在類ActiveLimitFilter聲明 存儲的變量信息呢?spa

  • 首先保證業務類的簡潔(只關注業務方法,實際調用計數等信息保存在RpcStatus 中)
  • 將計數信息抽出,保證通用性()

客戶端併發控制配置方式:.net

<dubbo:reference id="demoService"  interface="com.alibaba.dubbo.demo.DemoService" check="false"
                     group="dev" version="1.0.0" timeout="3000" actives="10"/>

設置com.alibaba.dubbo.demo.DemoService接口中全部方法,每一個方法最多同時併發請求10個請求。線程

也能夠使用下面方法設置接口中的單個方法的併發請求個數,以下:code

<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService"
    group="dev" version="1.0.0" timeout="3000">
            <dubbo:method name="sayHello" actives="10" />
</dubbo:reference>

如上設置了sayHello的併發請求數量爲10 , 若是客戶端請求該方法併發超過10 則會阻塞 , 當請求併發小於10時 , 該請求才會發送到請求提供方。xml

 

讚揚支持

相關文章
相關標籤/搜索