Dubbo 是如何控制併發數和限流的?

ExecuteLimitFilter

ExecuteLimitFilter ,在服務提供者,經過 <dubbo:service /> 的 "executes" 統一配置項開啓:
表示每服務的每方法最大可並行執行請求數。html

ExecuteLimitFilter是經過信號量來實現的對服務端的併發數的控制。java

ExecuteLimitFilter執行流程:面試

  1. 首先會去得到服務提供者每服務每方法最大可並行執行請求數
  2. 若是每服務每方法最大可並行執行請求數大於零,那麼就基於基於服務 URL + 方法維度獲取一個RpcStatus實例
  3. 經過RpcStatus實例獲取一個信號量,若果獲取的這個信號量調用tryAcquire返回false,則拋出異常
  4. 若是沒有拋異常,那麼久調用RpcStatus靜態方法beginCount,給這個 URL + 方法維度開始計數
  5. 調用服務
  6. 調用結束後計數調用RpcStatus靜態方法endCount,計數結束
  7. 釋放信號量

ExecuteLimitFilterspring

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    Semaphore executesLimit = null;
    boolean acquireResult = false;
    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
    if (max > 0) {
        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
        //            if (count.getActive() >= max) {
        /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
        executesLimit = count.getSemaphore(max);
        if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
        }
    }
    long begin = System.currentTimeMillis();
    boolean isSuccess = true;
    RpcStatus.beginCount(url, methodName);
    try {
        Result result = invoker.invoke(invocation);
        return result;
    } catch (Throwable t) {
        isSuccess = false;
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
    } finally {
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
        if(acquireResult) {
            executesLimit.release();
        }
    }
}

咱們接下來看看RpcStatus這個類併發

private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();

public static RpcStatus getStatus(URL url, String methodName) {
    String uri = url.toIdentityString();
    ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
    if (map == null) {
        METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
        map = METHOD_STATISTICS.get(uri);
    }
    RpcStatus status = map.get(methodName);
    if (status == null) {
        map.putIfAbsent(methodName, new RpcStatus());
        status = map.get(methodName);
    }
    return status;
}

這個方法很簡單,大概就是給RpcStatus這個類裏面的靜態屬性METHOD_STATISTICS裏面設值。外層的map是以url爲key,裏層的map是以方法名爲key。intellij-idea

private volatile int executesPermits;
public Semaphore getSemaphore(int maxThreadNum) {
    if(maxThreadNum <= 0) {
        return null;
    }

    if (executesLimit == null || executesPermits != maxThreadNum) {
        synchronized (this) {
            if (executesLimit == null || executesPermits != maxThreadNum) {
                executesLimit = new Semaphore(maxThreadNum);
                executesPermits = maxThreadNum;
            }
        }
    }

    return executesLimit;
}

這個方法是獲取信號量,若是這個實例裏面的信號量是空的,那麼就添加一個,若是不是空的就返回。jvm

TPSLimiter

TpsLimitFilter 過濾器,用於服務提供者中,提供限流的功能。ide

配置方式:工具

  1. 經過 <dubbo:parameter key="tps" value="" /> 配置項,添加到 <dubbo:service /> 或 <dubbo:provider /> 或 <dubbo:protocol /> 中開啓,例如:
dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
<dubbo:parameter key="tps" value="100" />
</dubbo:service>
  1. 經過 <dubbo:parameter key="tps.interval" value="" /> 配置項,設置 TPS 週期。

源碼分析

TpsLimitFilter源碼分析

private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

    if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
        throw new RpcException(
            "Failed to invoke service " +
            invoker.getInterface().getName() +
            "." +
            invocation.getMethodName() +
            " because exceed max service tps.");
    }

    return invoker.invoke(invocation);
}

invoke方法調用了DefaultTPSLimiter的isAllowable,咱們進入到isAllowable方法看一下

DefaultTPSLimiter

private final ConcurrentMap<String, StatItem> stats
    = new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
    //獲取tps這個參數設置的大小
    int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
    //獲取tps.interval這個參數設置的大小,默認60秒
    long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                                     Constants.DEFAULT_TPS_LIMIT_INTERVAL);
    String serviceKey = url.getServiceKey();
    if (rate > 0) {
        StatItem statItem = stats.get(serviceKey);
        if (statItem == null) {
            stats.putIfAbsent(serviceKey,
                              new StatItem(serviceKey, rate, interval));
            statItem = stats.get(serviceKey);
        }
        return statItem.isAllowable();
    } else {
        StatItem statItem = stats.get(serviceKey);
        if (statItem != null) {
            stats.remove(serviceKey);
        }
    }

    return true;
}

若要限流,調用 StatItem#isAllowable(url, invocation) 方法,根據 TPS 限流規則判斷是否限制這次調用。

StatItem

private long lastResetTime;

private long interval;

private AtomicInteger token;

private int rate;

public boolean isAllowable() {
    long now = System.currentTimeMillis();
    // 若到達下一個週期,恢復可用種子數,設置最後重置時間。
    if (now > lastResetTime + interval) {
        token.set(rate);// 回覆可用種子數
        lastResetTime = now;// 最後重置時間
    }å
    // CAS ,直到或獲得一個種子,或者沒有足夠種子
    int value = token.get();
    boolean flag = false;
    while (value > 0 && !flag) {
        flag = token.compareAndSet(value, value - 1);
        value = token.get();
    }

    return flag;
}

關注公衆號Java技術棧,在後臺回覆:面試,能夠獲取我整理的 Dubbo 系列面試題和答案。

做者: luozhiyun
出處:http://www.javashuo.com/article/p-zxpmbmuq-z.html

近期熱文推薦:

1.600+ 道 Java面試題及答案整理(2021最新版)

2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!

3.阿里 Mock 工具正式開源,幹掉市面上全部 Mock 工具!

4.Spring Cloud 2020.0.0 正式發佈,全新顛覆性版本!

5.《Java開發手冊(嵩山版)》最新發布,速速下載!

以爲不錯,別忘了隨手點贊+轉發哦!

相關文章
相關標籤/搜索