Dubbo源碼之服務端併發控制——ExecuteLimitFilter

上一篇關於《Dubbo客戶端併發控制——ActiveLimitFilter》 做用,設計原理,及配置方式。java

這篇是關於Dubbo服務端Filter組件擴展 ExecuteLimitFilter ,它能夠限制服務端的方法級別的併發處理請求數。 當請求數超過限制時,服務端採用的是非阻塞處理,若是超出併發數量,則直接進行失敗處理(這裏拋RPCException異常),這裏與客戶端限流ActiveLimitFilter 的wait不一樣的是,這裏採用Semaphore 信號量的方式,而且是搶佔式的(NonFairSync) , 不明白的能夠看下信號量相關源碼。編程

同分析ActiveLimitFilter同樣,首先看它的Activate註解信息 :併發

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)

這裏能夠得知它是用於服務端限流控制。ide

ActiveLimitFilter源碼:ui

/**
 * ThreadLimitInvokerFilter
 *
 * @author william.liangf
 */
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        //默認不設置executes時候,其值爲0
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) { //max>0說明設置了executes值
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * 經過信號量來作併發控制(即限制能使用的線程數量)
             * 2017-08-21 yizhenqiang
             */
            executesLimit = count.getSemaphore(max);
            //可知若是併發處理數量大於設置的值,則直接會拋出異常
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);
        try {
            // 進行接下來的功能/業務處理
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }
}

當請求併發數大於最大併發數時,則直接失敗處理。this

服務提供方進行併發控制配置方式以下:url

<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"
                   group="dev" version="1.0.0" timeout="3000" executes="10" />

設置com.alibaba.dubbo.demo.DemoService 接口中的全部方法,最多同時處理10個併發請求。spa

也能夠經過以下方式單獨對每一個方法進行併發控制:.net

<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"
               group="dev" version="1.0.0" timeout="3000" >
    <dubbo:method name="sayHello" executes="10"/>
</dubbo:service>

這裏咱們採用Semaphore來進行服務端請求的併發控制, 而不是採用 sync 同步代碼塊 , wait notify 方式的目的是什麼呢?線程

這裏Semaphore 代替 sync 其實是 cas 代替 鎖 + wait notify  ,  雖然Semaphore中底層採用的是單線程CAS , 等待線程LockSupport.park(this); 防止全部線程同時獲取令牌的CPU資源消耗。  源碼參考以前寫的一篇 AQS源碼介紹:

《java併發編程之:ReentrantLock實現原理與深刻研究》

 

讚揚支持

相關文章
相關標籤/搜索