ExecuteLimitFilter ,在服務提供者,經過
表示每服務的每方法最大可並行執行請求數。java
ExecuteLimitFilter是經過信號量來實現的對服務端的併發數的控制。併發
ExecuteLimitFilter執行流程:jvm
ExecuteLimitFilteride
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // if (count.getActive() >= max) { /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */ executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } }
咱們接下來看看RpcStatus這個類源碼分析
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); public static RpcStatus getStatus(URL url, String methodName) { String uri = url.toIdentityString(); ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri); if (map == null) { METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>()); map = METHOD_STATISTICS.get(uri); } RpcStatus status = map.get(methodName); if (status == null) { map.putIfAbsent(methodName, new RpcStatus()); status = map.get(methodName); } return status; }
這個方法很簡單,大概就是給RpcStatus這個類裏面的靜態屬性METHOD_STATISTICS裏面設值。外層的map是以url爲key,裏層的map是以方法名爲key。ui
private volatile int executesPermits; public Semaphore getSemaphore(int maxThreadNum) { if(maxThreadNum <= 0) { return null; } if (executesLimit == null || executesPermits != maxThreadNum) { synchronized (this) { if (executesLimit == null || executesPermits != maxThreadNum) { executesLimit = new Semaphore(maxThreadNum); executesPermits = maxThreadNum; } } } return executesLimit; }
這個方法是獲取信號量,若是這個實例裏面的信號量是空的,那麼就添加一個,若是不是空的就返回。this
TpsLimitFilter 過濾器,用於服務提供者中,提供限流的功能。url
配置方式:code
dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" > <dubbo:parameter key="tps" value="100" /> </dubbo:service>
TpsLimitFilterblog
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter(); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) { throw new RpcException( "Failed to invoke service " + invoker.getInterface().getName() + "." + invocation.getMethodName() + " because exceed max service tps."); } return invoker.invoke(invocation); }
invoke方法調用了DefaultTPSLimiter的isAllowable,咱們進入到isAllowable方法看一下
DefaultTPSLimiter
private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>(); @Override public boolean isAllowable(URL url, Invocation invocation) { //獲取tps這個參數設置的大小 int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); //獲取tps.interval這個參數設置的大小,默認60秒 long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL); String serviceKey = url.getServiceKey(); if (rate > 0) { StatItem statItem = stats.get(serviceKey); if (statItem == null) { stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); } return statItem.isAllowable(); } else { StatItem statItem = stats.get(serviceKey); if (statItem != null) { stats.remove(serviceKey); } } return true; }
若要限流,調用 StatItem#isAllowable(url, invocation) 方法,根據 TPS 限流規則判斷是否限制這次調用。
StatItem
private long lastResetTime; private long interval; private AtomicInteger token; private int rate; public boolean isAllowable() { long now = System.currentTimeMillis(); // 若到達下一個週期,恢復可用種子數,設置最後重置時間。 if (now > lastResetTime + interval) { token.set(rate);// 回覆可用種子數 lastResetTime = now;// 最後重置時間 } // CAS ,直到或獲得一個種子,或者沒有足夠種子 int value = token.get(); boolean flag = false; while (value > 0 && !flag) { flag = token.compareAndSet(value, value - 1); value = token.get(); } return flag; }