Dubbo 併發控制

前言

你們好,今天開始給你們分享 — Dubbo 專題之 Dubbo 併發控制。在前一個章節中咱們介紹了 Dubbo 負載均衡,Dubbo 爲咱們提供四種負載均衡算法分別是:加權隨機算法、加權輪詢算法、最少活躍調用數算法、一致性 Hash 算法。同時咱們也例舉了常見的使用場景而且進行了源碼解析來分析其實現原理。有的小夥伴學習了負載均衡算法後可能會想:當咱們有不少的消費線程時,若是服務提供端只有少數的實例,那麼會不會把咱們的服務提供端線程消費殆盡呢?或者超出了咱們的業務處理線程池最大接收請求數又會發生什麼呢?帶着這些疑問咱們開始本章節學習,咱們會經過介紹什麼是併發?怎樣控制併發?Dubbo 中是怎樣來解決這些問題。下面就讓咱們快速開始吧!java

1. 併發控制簡介

首先咱們得理解什麼是併發,這裏有另一個概念並行。下面是來自百科的解釋:併發和並行是即類似又有區別的兩個概念,並行是指兩個或者多個事件在同一時刻發生;而併發是指兩個或多個事件在同一時間間隔內發生。在多道程序環境下,併發性是指在一段時間內宏觀上有多個程序在同時運行,但在單處理器系統中,每一時刻卻僅能有一道程序執行,故微觀上這些程序只能是分時地交替執行。假若在計算機系統中有多個處理機,則這些能夠併發執行的程序即可被分配到多個處理機上,實現並行執行即利用每一個處理機來處理一個可併發執行的程序,這樣多個程序即可以同時執行。下面經過示例圖進行說明:git

併發並行概念

在上圖中咱們能夠看到單核處理在多線程併發執行任務時,同一時刻只有一個線程在執行,在 CPU 時間片切換的時候會調度到其餘線程進行執行這就叫作併發。同理當在多核處理器上多個線程同時執行且在不一樣 CPU 上的時,這就叫作並行執行,每個線程都在一個CPU上執行且線程間互不影響。算法

2. 併發控制方式

在 Dubbo 中提供了兩大類配置分別是:消費端控制配置、服務提供端控制配置。spring

2.1 服務端控制配置

  1. 限定服務的每一個方法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" executes="10" />
  1. 限定服務的某個方法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade">
    <dubbo:method name="queryAll" executes="10" />
</dubbo:service>

2.2 消費端配置

1. 限定服務的全部方法
<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" actives="10" />

或者shell

<dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade">
  <dubbo:method name="queryAll" actives="10" />
</dubbo:service>
  1. 限定服務的某個方法apache

    <dubbo:reference interface="com.muke.dubbocourse.common.api.BookFacade" actives="10" />

或者編程

<dubbo:reference interface="com.muke.dubbocourse.common.api.BookFacade">
    <dubbo:method name="queryAll" actives="10" />
</dubbo:service>

若是 <dubbo:service><dubbo:reference> 都配了actives<dubbo:reference> 優先,參見:覆蓋策略參考api

咱們從上面的配置能夠看出服務提供者端經過executes配置而消費端經過actives配置。服務器

3. 使用場景

併發控制在咱們平常的工做中通常狀況咱們不多會去直接配置,咱們通常是自定義業務處理線程池大小。在 Dubbo 中當接收到服務請求後會把請求轉發到業務處理線程池去處理,因此接收請求的線程 IO 瓶頸不大。我能想到的使用場景以下:微信

  1. 機器性能差距大:假設咱們在一個服務集羣中,其中一臺服務器性能較差那麼咱們能夠手動配置這臺服務的所能接受的最大請求併發數,避免請求失敗。
  2. 壓測服務器性能:好比咱們有幾臺機器須要進行性能評估,那麼咱們可能須要跑壓力測試,這時咱們能夠把全部服務提供者的最大併發數設置爲一致。這樣咱們就能夠在排除程序自己的因素來評估機器的性能。

4. 示例演示

咱們以獲取圖書列表爲例來進行演示。項目結構以下:

idea

這裏咱們主要更改了服務提供者的 XML 配置文件dubbo-provider-xml.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="demo-provider" metadata-type="remote"/>

    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <bean id="bookFacade" class="com.muke.dubbocourse.concurrent.provider.BookFacadeImpl"/>

    <!--暴露本地服務爲Dubbo服務 ,executes="10" 表示限制每一個方法的併發數爲10-->
    <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" executes="10" ref="bookFacade" />

</beans>

上面主要增長了executes="10"配置,對服務BookFacade全部的方法進行最大併發數限制。

下面咱們看看消費端代碼:

public static void main(String[] args) throws IOException {

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("concurrent/consumer/spring/dubbo-consumer-xml.xml");

        context.start();

        BookFacade bookFacade = context.getBean(BookFacade.class);

        //循環啓動30個線程進行併發訪問
        for ( int i = 0; i < 30; i++) {

           final int index = i;

            //開啓線程
            new Thread(()->{

                List<Book> books = bookFacade.queryAll();

                System.out.println("The invoker "+index+" result is "+ books);

            }).start();

        }

        System.in.read();

        //context.close();

    }

同時咱們須要設置下queryAll方法的執行時間稍微長一些這樣才能看到演示效果,正常狀況下咱們會看到以下錯誤:

cause: The service using threads greater than <dubbo:service executes="10" /> limited.

提示得很是明顯,就是說咱們的服務最大併發爲設置爲10。

5. 實現原理

在講解原理以前假設這個讓咱們本身來實現的話咱們該怎樣實現呢?我想小夥伴可能都會想到這裏控制併發數量無非就是對調用方法或服務進行一個全局的計數統計,若是達到了閥值就開始執行限制。那咱們就來看看 Dubbo 中是怎樣實現。在 Dubbo 中執行這個邏輯的類是ActiveLimitFilter其核心代碼以下:

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        //執行的方法名稱
        String methodName = invocation.getMethodName();
        //獲取配置的併發參數配置
        int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
        final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        //開始計數
        if (!RpcStatus.beginCount(url, methodName, max)) {
            //計數失敗,獲取超時時間
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
            //記錄開始時間
            long start = System.currentTimeMillis();
            long remain = timeout;
            synchronized (rpcStatus) {
                //再次嘗試計數
                while (!RpcStatus.beginCount(url, methodName, max)) {
                    try {
                        //計數失敗 阻塞等待 等待接收到onMessage、onError方法回調釋放rpcStatus的阻塞
                        rpcStatus.wait(remain);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    //等待超時
                    if (remain <= 0) {
                        throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
                                "Waiting concurrent invoke timeout in client-side for service:  " +
                                        invoker.getInterface().getName() + ", method: " + invocation.getMethodName() +
                                        ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
                                        rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }

        invocation.put(ACTIVELIMIT_FILTER_START_TIME, System.currentTimeMillis());

        //調用服務
        return invoker.invoke(invocation);
    }

在上面的代碼中咱們能夠看到在調用服務前對當前調用的方法進行計數,若是計數失敗會阻塞等待指定的超時時間,計數成功則調用遠程服務。

6. 小結

在本小節中咱們主要學習了 Dubbo 併發控制以及併發和並行的區別,同時也分析了併發控制實現的原理,其本質上是經過對調用的方法或服務進行應用級別的計數統計,當達到閥值就限制訪問。

本節課程的重點以下:

  1. 理解 Dubbo 併發控制
  2. 瞭解了併發控制的使用方式
  3. 瞭解延併發控制使用場景
  4. 瞭解併發控制實現原理

做者

我的從事金融行業,就任過易極付、思建科技、某網約車平臺等重慶一流技術團隊,目前就任於某銀行負責統一支付系統建設。自身對金融行業有強烈的愛好。同時也實踐大數據、數據存儲、自動化集成和部署、分佈式微服務、響應式編程、人工智能等領域。同時也熱衷於技術分享創立公衆號和博客站點對知識體系進行分享。關注公衆號: 青年IT男 獲取最新技術文章推送!

博客地址: http://youngitman.tech

微信公衆號:

相關文章
相關標籤/搜索