Soul網關學習Apache Dubbo插件原理解析

目標

  • Apache Dubbo 插件介紹java

    • 元數據介紹
  • Apache Dubbo 插件配置git

    • Bootstrap pom 配置
    • soul-admin 配置
    • dubbo服務pom配置
  • Apache Dubbo 泛化調用介紹github

    • 經過API方式使用泛化調用
    • 經過spring使用泛化調用
    • 泛化調用實現流程
  • Soul Dubbo 插件調用解析spring

    • ApachDubboPlugin泛化調用準備
    • ApacheDubboProxySerivce
    • DubboResponsePlugin
    • WebFluxResultUtils返回結果
  • Dubbo泛化調用介紹sql

  • 總結數據庫

  • 參考apache

    Apache Dubbo 插件介紹

    Apache Dubbo 是一款高性能、輕量級的開源Java服務框架,主要提供了六大核心能力,面向接口代理的高性能RPC調用,智能容錯和負載均衡,服務自動註冊與發現,高度可擴展能力,運行期流量調度,可視化的服務治理與運維。 網關中Dubbo插件主要是將 Http協議 轉換成 Dubbo協議 ,也是網關實現Dubbo泛化調用的關鍵。而Dubbo插件須要配合 元數據 才能實現Dubbo調用。編程

    元數據介紹

    元數據做用就是在進行協議轉換時候要獲取真實的請求 pathmethodNameparameterTypes 爲泛化調用作好準備json

image.png

  • 在數據庫中,咱們有一張表單獨存儲Dubbo元信息,經過數據同步方案,會把這張表的數據同步到網關的JVM內存中bootstrap

  • 表結構以下

    CREATE TABLE  IF NOT EXISTS `meta_data` (
    `id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'id',
    `app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '應用名稱',
    `path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '路徑,不能重複',
    `path_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '路徑描述',
    `rpc_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'rpc類型',
    `service_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '服務名稱',
    `method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '方法名稱',
    `parameter_types` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '參數類型 多個參數類型 逗號隔開',
    `rpc_ext` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'rpc的擴展信息,json格式',
    `date_created` datetime(0) NOT NULL COMMENT '建立時間',
    `date_updated` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新時間',
    `enabled` tinyint(4) NOT NULL DEFAULT 0 COMMENT '啓用狀態',
    PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;
    複製代碼
    • path 字段主要是在請求網關的時候,會根據你的 path 字段來匹配到一條數據,而後進行後續的處理流程
    • rpc_ext 字段若是代理的接口是 Dubbo 類型的服務接口,同時設置了 group version 字段時候,那麼信息就會存儲到 rpc_ext
    • 每個 Dubbo 接口方法會應對一條元數據,對比SpringCloud、http分別是隻存儲一條/contextPath/** 和不存儲

    Apache Dubbo 插件配置

    soul-bootstrap pom 配置

    <dependency>
    <groupId>org.dromara</groupId>
    <artifactId>soul-spring-boot-starter-plugin-apache-dubbo</artifactId>
    <version>${project.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.dubbo</groupId>
    <artifactId>dubbo</artifactId>
    <version>2.7.5</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>${curator.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>${curator.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>${curator.version}</version>
    </dependency>
    複製代碼

soul-admin 配置

image.png

登陸soul-admin後臺在插件管理頁面打開Dubbo配置選項的開關,和填寫註冊中心的鏈接地址

dubbo服務pom配置

<dependency>
 <groupId>org.dromara</groupId>
 <artifactId>soul-spring-boot-starter-client-apache-dubbo</artifactId>
 <version>${soul.version}</version>
</dependency>
@SoulDubboClient(path = "/insert", desc = "Insert a row of data")
public DubboTest insert(final DubboTest dubboTest) {
    dubboTest.setName("hello world Soul Apache Dubbo: " + dubboTest.getName());
    return dubboTest;
}
複製代碼

被代理的服務使用提供的 soul-spring-boot-starter-client-apache-dubbo 客戶端依賴,同時使用@SoulDubboClient 註解,在啓動時候將接口的名稱,參數類型,參數內容註冊到 soul-admin 端,而後 admin 端將數據同步到 bootstrap 端。

Apache Dubbo 泛化調用介紹

泛化接口調用方式主要用於客戶端沒有API接口及模型類元的狀況,參數及返回值中的全部POJO均用 Map 表示, 一般用於框架集成,可經過GenericSerivce調用全部服務實現。

經過API方式使用泛化調用(網關目前使用方式)

ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
複製代碼

網關經過API方式聲明註冊使用泛化調用

經過Spring使用泛化調用

<dubbo:reference id="barService" interface="com.foo.BarService" generic="true" />
複製代碼

泛化調用實現流程

+-------------------------------------------+               +-------------------------------------------+
|  consumer 端                               |               | provider 端                                |
|                                           |               |                                           |
|                                           |               |                                           |
|                                           |               |                                           |
|                                           |               |                                           |
|                    +------------------+   |               |       +--------------+                    |
|                    |GenericImplFilter |   |  Invocation   |       |GenericFilter |                    |
|             +----> |                  +-------------------------> |              |                    |
|             |      +------------------+   |               |       +--------------+                    |
| +-----------+                             |               |                      |    +-----------+   |
| |           |                             |               |                      |    |           |   |
| |Client     |                             |               |                      +--> | Service   |   |
| |           |                             |               |                           |           |   |
| +-----------+                             |               |                           +-------+---+   |
|                                           |               |                                   |       |
|      ^             +------------------+   |               |       +--------------+            |       |
|      |             |GenericImplFilter |   |               |       |GenericFilter | <----------+       |
|      +-------------+                  | <-------------------------+              |                    |
|                    +------------------+   |               |       +--------------+                    |
|                                           |               |                                           |
|                                           |               |                                           |
|                                           |               |                                           |
|                                           |               |                                           |
+-------------------------------------------+               +-------------------------------------------+
複製代碼

GenericService 這個接口和Java的反射調用很是像,只需提供調用的方法名稱,參數的類型以及參數的值就能夠直接調用對應方法了。 - GenericFilter : 負責provider端參數的轉換 - 調用時,將hashMap結構的參數轉換成對應Pojo - 返回結果是,將Pojo轉換成hashMap

image.png - GenericImplFilter : 負責consumer端參數的轉換,將Pojo轉換成hashMap接口

image.png

/** * Generic service interface * * @export */
public interface GenericService {

    /** * Generic invocation * * @param method 方法名,如:findPerson,若是有重載方法,需帶上參數列表,如:findPerson(java.lang.String) * @param parameterTypes 參數類型 * @param args 參數列表 * @return invocation 返回值 * @throws GenericException 方法拋出的異常 */
    Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;

    default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
        Object object = $invoke(method, parameterTypes, args);
        if (object instanceof CompletableFuture) {
            return (CompletableFuture<Object>) object;
        }
        return CompletableFuture.completedFuture(object);
    }

}
複製代碼

Soul Dubbo 插件調用解析

當業務請求發起時候,首先進入 SoulWebHandler (至於爲何成爲請求入口自行查詢,本文不做解釋) 類的 Handle 方法,下面就帶了 pluginsDefaultSoulPluginChain 類開始進入插件鏈調用。

@Override
    public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
        return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
    }
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
    // 響應式編程
    return Mono.defer(() -> {
        // 判斷當前index 是否 < 插件數量
        if (this.index < plugins.size()) {
            // 依次從plugins 中獲取一種插件進行調用
            SoulPlugin plugin = plugins.get(this.index++);
            // 判斷此插件是否未打開
            Boolean skip = plugin.skip(exchange);
            if (skip) {
                return this.execute(exchange);
            }
            return plugin.execute(exchange, this);
        }
        return Mono.empty();
    });
}
複製代碼

本章只關注Apache Dubbo 因此咱們重點放到Dubbo 插件的調用。 image.png 通過Debug網關程序咱們知道實際上是按照上面的順序一個一個的進行判斷調用。下面咱們關注 ApacheDubboPlugin

ApachDubboPlugin 泛化調用準備

@Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        // 獲取 dubbo_params 數據
        String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
        // 獲取 exchange context的屬性值
        SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        // 獲取 exchange metaData 屬性值
        MetaData metaData = exchange.getAttribute(Constants.META_DATA);
        // 判斷metaData是否有誤,若是有誤直接返回 metaData 有誤的返回信息
        if (!checkMetaData(metaData)) {
            assert metaData != null;
            log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 判斷 metaData的parameterTypes 和 body 是否爲空,若是有誤則返回Body錯誤信息
        if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
            exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 帶着exchange、body、metaData 進行 Dubbo GenericsService的異步調用
        final Mono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);
        return result.then(chain.execute(exchange));
    }
複製代碼

首先對泛化調用所須要的參數進行檢查

ApacheDubboProxyService

public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
    // issue(https://github.com/dromara/soul/issues/471), add dubbo tag route
    String dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);
    if (StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)) {
        RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);
    }
    // 根據metaData路徑獲取ferference
    ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
    if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
        ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
        reference = ApplicationConfigCache.getInstance().initRef(metaData);
    }
    // 根據ferference 獲取泛化調用的實例 GenericService
    GenericService genericService = reference.get();
    Pair<String[], Object[]> pair;
    if (ParamCheckUtils.dubboBodyIsEmpty(body)) {
        pair = new ImmutablePair<>(new String[]{}, new Object[]{});
    } else {
        // 根據body 和 parameterTypes 組織Dubbo 泛化調用的參數類型和參數值
        pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
    }
    // 下面使用GenericSerice 默認方法$invokeAsync進行異步調用
    CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
    return Mono.fromFuture(future.thenApply(ret -> {
        if (Objects.isNull(ret)) {
            ret = Constants.DUBBO_RPC_RESULT_EMPTY;
        }
        // 等調用成功以後 將結果和類型複製到exchagne 對應的屬性上
        exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
        return ret;
    })).onErrorMap(exception -> exception instanceof GenericException ? new SoulException(((GenericException) exception).getExceptionMessage()) : new SoulException(exception));
}
複製代碼

DubboResponsePlugin

@Overridepublic Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {    return chain.execute(exchange).then(Mono.defer(() -> {        final Object result = exchange.getAttribute(Constants.DUBBO_RPC_RESULT);        if (Objects.isNull(result)) {            Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);            return WebFluxResultUtils.result(exchange, error);        }        Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result));        return WebFluxResultUtils.result(exchange, success);    }));}
複製代碼

image.png

WebFluxResultUtils 返回結果

image.png

Dubbo泛化調用介紹

Dubbo泛化調用主要就分爲兩塊分別是消費端如何使用 GenericImplFilter 攔截泛化調用、服務提供端如何使用 GenericFilter 攔截請求後把泛化參數序列化而後請求給具體服務。

服務消費端org.apache.dubbo.rpc.filter.GenericImplFilter是如何攔截泛化調用

@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter, Filter.Listener {
@Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // ... 省略非核心代碼
        // 判斷是否爲泛化調用
        if (isMakingGenericCall(generic, invocation)) {
            // 獲取泛化參數
            Object[] args = (Object[]) invocation.getArguments()[2];
            // 若是泛化爲nativeJava
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (Object arg : args) {
                    if (!(byte[].class == arg.getClass())) {
                        error(generic, byte[].class.getName(), arg.getClass().getName());
                    }
                }
                // 若是泛化方式爲bean
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (Object arg : args) {
                    if (!(arg instanceof JavaBeanDescriptor)) {
                        error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
                    }
                }
            }

            // 設置attachment ,以便與服務端調用
            invocation.setAttachment(
                    GENERIC_KEY, invoker.getUrl().getParameter(GENERIC_KEY));
        }
        // 發起遠程調用
        return invoker.invoke(invocation);
    }
    private boolean isMakingGenericCall(String generic, Invocation invocation) {
        return (invocation.getMethodName().equals($INVOKE) || invocation.getMethodName().equals($INVOKE_ASYNC))
                && invocation.getArguments() != null
                && invocation.getArguments().length == 3
                && ProtocolUtils.isGeneric(generic);
    }
}
複製代碼

GenericImplFilter 實現接口Filter(關於Dubbo中的Filter,不作介紹)而後執行Invoke方法,invoke方法主要作以下事情: - 參數校驗,檢查這個調用是不是泛化調用 - 獲取泛化參數 - 判斷泛化調用方式:遍歷每一個參數,而後依次判斷參數的泛化方式是nativejava仍是bean方式 - 發起遠程調用

服務提供端經過GenericFilter攔截泛化請求

@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter implements Filter, Filter.Listener {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        // 參數校驗
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
            // 獲取參數名稱、參數類型、參數值
            String name = ((String) inv.getArguments()[0]).trim();
            String[] types = (String[]) inv.getArguments()[1];
            Object[] args = (Object[]) inv.getArguments()[2];
            try {
                // 使用反射獲取調用的方法
                Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
                Class<?>[] params = method.getParameterTypes();
                if (args == null) {
                    args = new Object[params.length];
                }
                // 獲取泛化引用使用的泛化類型,true or bean or nativejava
                String generic = inv.getAttachment(GENERIC_KEY);
                if (StringUtils.isBlank(generic)) {
                    generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
                }
                // 若是generic=true 則使用true方式對入參進行反序列化
                if (StringUtils.isEmpty(generic)
                        || ProtocolUtils.isDefaultGenericSerialization(generic)
                        || ProtocolUtils.isGenericReturnRawResult(generic)) {
                    args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
                    // 若是 generic=nativejava,則使用nativejava方式對入參進行反序列化
                } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                    for (int i = 0; i < args.length; i++) {
                        if (byte[].class == args[i].getClass()) {
                            try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                                args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                        .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                                        .deserialize(null, is).readObject();
                            } catch (Exception e) {
                                throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                            }
                        } else {
                            throw new RpcException(...);
                        }
                    }
                    // 若是 generic=bean 則使用bean方式對入參進行反序列化
                } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                    for (int i = 0; i < args.length; i++) {
                        if (args[i] instanceof JavaBeanDescriptor) {
                            args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                        } else {
                            throw new RpcException(...);
                        }
                    }
                } ...
                // 將本次請求傳遞到FilterChain的下一個Filter中,並返回結果result
                RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args, inv.getAttachments(), inv.getAttributes());
                rpcInvocation.setInvoker(inv.getInvoker());
                rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());

                return invoker.invoke(rpcInvocation);
            } catch (NoSuchMethodException e) {
                throw new RpcException(e.getMessage(), e);
            } catch (ClassNotFoundException e) {
                throw new RpcException(e.getMessage(), e);
            }
        }
        // 若是不是泛化調用,直接把請求傳給FilterChain的下一個Filter
        return invoker.invoke(inv);
    }
}
複製代碼

以上就是Dubbo服務提供端如何攔截泛化請求,並進行處理的大致流程: - 參數校驗,判斷這次請求是否是泛化調用 - 獲取參數名稱、參數類型、參數值 - 使用反射獲取調用的方法,和使用的泛化方式 true or nativejava or bean - 根據泛化方式,反序列化泛化參數 - 將本次請求,包括調用的方法,參數和上下文信息傳遞給FilterChain的下一個Filter中,並返回Result結果 - 根據泛化方式,反序列化Result結果返回給服務消費端

總結

以上從如何配置Dubbo插件到整個調用流程的分析,而後分別介紹服務消費端與服務提供端如何攔截泛化調用流程對參數進行序列化細節,但願對你有所幫助

參考

my.oschina.net/u/4564034/b…

qsli.github.io/2018/05/02/…

相關文章
相關標籤/搜索