上一篇關於《Dubbo客戶端併發控制——ActiveLimitFilter》 做用,設計原理,及配置方式。java
這篇是關於Dubbo服務端Filter組件擴展 ExecuteLimitFilter ,它能夠限制服務端的方法級別的併發處理請求數。 當請求數超過限制時,服務端採用的是非阻塞處理,若是超出併發數量,則直接進行失敗處理(這裏拋RPCException異常),這裏與客戶端限流ActiveLimitFilter 的wait不一樣的是,這裏採用Semaphore 信號量的方式,而且是搶佔式的(NonFairSync) , 不明白的能夠看下信號量相關源碼。編程
同分析ActiveLimitFilter同樣,首先看它的Activate註解信息 :併發
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
這裏能夠得知它是用於服務端限流控制。ide
ActiveLimitFilter源碼:ui
/** * ThreadLimitInvokerFilter * * @author william.liangf */ @Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY) public class ExecuteLimitFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; //默認不設置executes時候,其值爲0 int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { //max>0說明設置了executes值 RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // if (count.getActive() >= max) { /** * http://manzhizhen.iteye.com/blog/2386408 * 經過信號量來作併發控制(即限制能使用的線程數量) * 2017-08-21 yizhenqiang */ executesLimit = count.getSemaphore(max); //可知若是併發處理數量大於設置的值,則直接會拋出異常 if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; RpcStatus.beginCount(url, methodName); try { // 進行接下來的功能/業務處理 Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } } }
當請求併發數大於最大併發數時,則直接失敗處理。this
服務提供方進行併發控制配置方式以下:url
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" group="dev" version="1.0.0" timeout="3000" executes="10" />
設置com.alibaba.dubbo.demo.DemoService 接口中的全部方法,最多同時處理10個併發請求。spa
也能夠經過以下方式單獨對每一個方法進行併發控制:.net
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" group="dev" version="1.0.0" timeout="3000" > <dubbo:method name="sayHello" executes="10"/> </dubbo:service>
這裏咱們採用Semaphore來進行服務端請求的併發控制, 而不是採用 sync 同步代碼塊 , wait notify 方式的目的是什麼呢?線程
這裏Semaphore 代替 sync 其實是 cas 代替 鎖 + wait notify , 雖然Semaphore中底層採用的是單線程CAS , 等待線程LockSupport.park(this); 防止全部線程同時獲取令牌的CPU資源消耗。 源碼參考以前寫的一篇 AQS源碼介紹:
《java併發編程之:ReentrantLock實現原理與深刻研究》