Apache Dubbo 插件介紹java
Apache Dubbo 插件配置git
Apache Dubbo 泛化調用介紹github
Soul Dubbo 插件調用解析spring
Dubbo泛化調用介紹sql
總結數據庫
參考apache
Apache Dubbo 是一款高性能、輕量級的開源Java服務框架,主要提供了六大核心能力,面向接口代理的高性能RPC調用,智能容錯和負載均衡,服務自動註冊與發現,高度可擴展能力,運行期流量調度,可視化的服務治理與運維。 網關中Dubbo插件主要是將 Http協議
轉換成 Dubbo協議
,也是網關實現Dubbo泛化調用的關鍵。而Dubbo插件須要配合 元數據
才能實現Dubbo調用。編程
元數據做用就是在進行協議轉換時候要獲取真實的請求 path
、methodName
、 parameterTypes
爲泛化調用作好準備json
在數據庫中,咱們有一張表單獨存儲Dubbo元信息,經過數據同步方案,會把這張表的數據同步到網關的JVM內存中bootstrap
表結構以下
CREATE TABLE IF NOT EXISTS `meta_data` (
`id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'id',
`app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '應用名稱',
`path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '路徑,不能重複',
`path_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '路徑描述',
`rpc_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'rpc類型',
`service_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '服務名稱',
`method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '方法名稱',
`parameter_types` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '參數類型 多個參數類型 逗號隔開',
`rpc_ext` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'rpc的擴展信息,json格式',
`date_created` datetime(0) NOT NULL COMMENT '建立時間',
`date_updated` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新時間',
`enabled` tinyint(4) NOT NULL DEFAULT 0 COMMENT '啓用狀態',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;
複製代碼
path
字段主要是在請求網關的時候,會根據你的 path
字段來匹配到一條數據,而後進行後續的處理流程rpc_ext
字段若是代理的接口是 Dubbo
類型的服務接口,同時設置了 group
version
字段時候,那麼信息就會存儲到 rpc_ext
中Dubbo
接口方法會應對一條元數據,對比SpringCloud、http分別是隻存儲一條/contextPath/** 和不存儲<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-apache-dubbo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
複製代碼
登陸soul-admin後臺在插件管理頁面打開Dubbo配置選項的開關,和填寫註冊中心的鏈接地址
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-client-apache-dubbo</artifactId>
<version>${soul.version}</version>
</dependency>
@SoulDubboClient(path = "/insert", desc = "Insert a row of data")
public DubboTest insert(final DubboTest dubboTest) {
dubboTest.setName("hello world Soul Apache Dubbo: " + dubboTest.getName());
return dubboTest;
}
複製代碼
被代理的服務使用提供的
soul-spring-boot-starter-client-apache-dubbo
客戶端依賴,同時使用@SoulDubboClient
註解,在啓動時候將接口的名稱,參數類型,參數內容註冊到soul-admin
端,而後admin
端將數據同步到bootstrap
端。
泛化接口調用方式主要用於客戶端沒有API接口及模型類元的狀況,參數及返回值中的全部POJO均用 Map
表示, 一般用於框架集成,可經過GenericSerivce調用全部服務實現。
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
複製代碼
網關經過API方式聲明註冊使用泛化調用
<dubbo:reference id="barService" interface="com.foo.BarService" generic="true" />
複製代碼
+-------------------------------------------+ +-------------------------------------------+
| consumer 端 | | provider 端 |
| | | |
| | | |
| | | |
| | | |
| +------------------+ | | +--------------+ |
| |GenericImplFilter | | Invocation | |GenericFilter | |
| +----> | +-------------------------> | | |
| | +------------------+ | | +--------------+ |
| +-----------+ | | | +-----------+ |
| | | | | | | | |
| |Client | | | +--> | Service | |
| | | | | | | |
| +-----------+ | | +-------+---+ |
| | | | |
| ^ +------------------+ | | +--------------+ | |
| | |GenericImplFilter | | | |GenericFilter | <----------+ |
| +-------------+ | <-------------------------+ | |
| +------------------+ | | +--------------+ |
| | | |
| | | |
| | | |
| | | |
+-------------------------------------------+ +-------------------------------------------+
複製代碼
GenericService
這個接口和Java的反射調用很是像,只需提供調用的方法名稱,參數的類型以及參數的值就能夠直接調用對應方法了。 - GenericFilter : 負責provider端參數的轉換 - 調用時,將hashMap結構的參數轉換成對應Pojo - 返回結果是,將Pojo轉換成hashMap
- GenericImplFilter : 負責consumer端參數的轉換,將Pojo轉換成hashMap接口
/** * Generic service interface * * @export */
public interface GenericService {
/** * Generic invocation * * @param method 方法名,如:findPerson,若是有重載方法,需帶上參數列表,如:findPerson(java.lang.String) * @param parameterTypes 參數類型 * @param args 參數列表 * @return invocation 返回值 * @throws GenericException 方法拋出的異常 */
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
Object object = $invoke(method, parameterTypes, args);
if (object instanceof CompletableFuture) {
return (CompletableFuture<Object>) object;
}
return CompletableFuture.completedFuture(object);
}
}
複製代碼
當業務請求發起時候,首先進入 SoulWebHandler
(至於爲何成爲請求入口自行查詢,本文不做解釋) 類的 Handle
方法,下面就帶了 plugins
從 DefaultSoulPluginChain
類開始進入插件鏈調用。
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
// 響應式編程
return Mono.defer(() -> {
// 判斷當前index 是否 < 插件數量
if (this.index < plugins.size()) {
// 依次從plugins 中獲取一種插件進行調用
SoulPlugin plugin = plugins.get(this.index++);
// 判斷此插件是否未打開
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
複製代碼
本章只關注Apache Dubbo 因此咱們重點放到Dubbo 插件的調用。
通過Debug網關程序咱們知道實際上是按照上面的順序一個一個的進行判斷調用。下面咱們關注
ApacheDubboPlugin
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 獲取 dubbo_params 數據
String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
// 獲取 exchange context的屬性值
SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 獲取 exchange metaData 屬性值
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
// 判斷metaData是否有誤,若是有誤直接返回 metaData 有誤的返回信息
if (!checkMetaData(metaData)) {
assert metaData != null;
log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 判斷 metaData的parameterTypes 和 body 是否爲空,若是有誤則返回Body錯誤信息
if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 帶着exchange、body、metaData 進行 Dubbo GenericsService的異步調用
final Mono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);
return result.then(chain.execute(exchange));
}
複製代碼
首先對泛化調用所須要的參數進行檢查
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
// issue(https://github.com/dromara/soul/issues/471), add dubbo tag route
String dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);
if (StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)) {
RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);
}
// 根據metaData路徑獲取ferference
ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
reference = ApplicationConfigCache.getInstance().initRef(metaData);
}
// 根據ferference 獲取泛化調用的實例 GenericService
GenericService genericService = reference.get();
Pair<String[], Object[]> pair;
if (ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
// 根據body 和 parameterTypes 組織Dubbo 泛化調用的參數類型和參數值
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
// 下面使用GenericSerice 默認方法$invokeAsync進行異步調用
CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
// 等調用成功以後 將結果和類型複製到exchagne 對應的屬性上
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new SoulException(((GenericException) exception).getExceptionMessage()) : new SoulException(exception));
}
複製代碼
@Overridepublic Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) { return chain.execute(exchange).then(Mono.defer(() -> { final Object result = exchange.getAttribute(Constants.DUBBO_RPC_RESULT); if (Objects.isNull(result)) { Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result)); return WebFluxResultUtils.result(exchange, success); }));}
複製代碼
Dubbo泛化調用主要就分爲兩塊分別是消費端如何使用 GenericImplFilter
攔截泛化調用、服務提供端如何使用 GenericFilter
攔截請求後把泛化參數序列化而後請求給具體服務。
@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter, Filter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// ... 省略非核心代碼
// 判斷是否爲泛化調用
if (isMakingGenericCall(generic, invocation)) {
// 獲取泛化參數
Object[] args = (Object[]) invocation.getArguments()[2];
// 若是泛化爲nativeJava
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
// 若是泛化方式爲bean
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
// 設置attachment ,以便與服務端調用
invocation.setAttachment(
GENERIC_KEY, invoker.getUrl().getParameter(GENERIC_KEY));
}
// 發起遠程調用
return invoker.invoke(invocation);
}
private boolean isMakingGenericCall(String generic, Invocation invocation) {
return (invocation.getMethodName().equals($INVOKE) || invocation.getMethodName().equals($INVOKE_ASYNC))
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic);
}
}
複製代碼
GenericImplFilter 實現接口Filter(關於Dubbo中的Filter,不作介紹)而後執行Invoke方法,invoke方法主要作以下事情: - 參數校驗,檢查這個調用是不是泛化調用 - 獲取泛化參數 - 判斷泛化調用方式:遍歷每一個參數,而後依次判斷參數的泛化方式是nativejava仍是bean方式 - 發起遠程調用
@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter implements Filter, Filter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 參數校驗
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
// 獲取參數名稱、參數類型、參數值
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
// 使用反射獲取調用的方法
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}
// 獲取泛化引用使用的泛化類型,true or bean or nativejava
String generic = inv.getAttachment(GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
}
// 若是generic=true 則使用true方式對入參進行反序列化
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)
|| ProtocolUtils.isGenericReturnRawResult(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
// 若是 generic=nativejava,則使用nativejava方式對入參進行反序列化
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(...);
}
}
// 若是 generic=bean 則使用bean方式對入參進行反序列化
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(...);
}
}
} ...
// 將本次請求傳遞到FilterChain的下一個Filter中,並返回結果result
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args, inv.getAttachments(), inv.getAttributes());
rpcInvocation.setInvoker(inv.getInvoker());
rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());
return invoker.invoke(rpcInvocation);
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}
// 若是不是泛化調用,直接把請求傳給FilterChain的下一個Filter
return invoker.invoke(inv);
}
}
複製代碼
以上就是Dubbo服務提供端如何攔截泛化請求,並進行處理的大致流程: - 參數校驗,判斷這次請求是否是泛化調用 - 獲取參數名稱、參數類型、參數值 - 使用反射獲取調用的方法,和使用的泛化方式
true
ornativejava
orbean
- 根據泛化方式,反序列化泛化參數 - 將本次請求,包括調用的方法,參數和上下文信息傳遞給FilterChain的下一個Filter中,並返回Result結果 - 根據泛化方式,反序列化Result結果返回給服務消費端
以上從如何配置Dubbo插件到整個調用流程的分析,而後分別介紹服務消費端與服務提供端如何攔截泛化調用流程對參數進行序列化細節,但願對你有所幫助