在 Dubbo 中,咱們能夠經過兩種方式引用遠程服務。第一種是使用服務直連的方式引用服務,第二種方式是基於註冊中心進行引用。服務直連的方式僅適合在調試或測試服務的場景下使用,不適合在線上環境使用。所以,本文我將重點分析經過註冊中心引用服務的過程。從註冊中心中獲取服務配置只是服務引用過程當中的一環,除此以外,服務消費者還須要經歷 Invoker 建立、代理類建立等步驟。這些步驟,將在後續章節中一一進行分析。java
Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務,第二個是在 ReferenceBean 對應的服務被注入到其餘類中時引用。這兩個引用服務的時機區別在於,第一個是餓漢式的,第二個是懶漢式的。默認狀況下,Dubbo 使用懶漢式引用服務。若是須要使用餓漢式,可經過配置 dubbo:reference 的 init 屬性開啓。下面咱們按照 Dubbo 默認配置進行分析,整個分析過程從 ReferenceBean 的 getObject 方法開始。當咱們的服務被注入到其餘類中時,Spring 會第一時間調用 getObject 方法,並由該方法執行服務引用邏輯。按照慣例,在進行具體工做以前,需先進行配置檢查與收集工做。接着根據收集到的信息決定服務用的方式,有三種,第一種是引用本地 (JVM) 服務,第二是經過直連方式引用遠程服務,第三是經過註冊中心引用遠程服務。不論是哪一種引用方式,最後都會獲得一個 Invoker 實例。若是有多個註冊中心,多個服務提供者,這個時候會獲得一組 Invoker 實例,此時須要經過集羣管理類 Cluster 將多個 Invoker 合併成一個實例。合併後的 Invoker 實例已經具有調用本地或遠程服務的能力了,但並不能將此實例暴露給用戶使用,這會對用戶業務代碼形成侵入。此時框架還須要經過代理工廠類 (ProxyFactory) 爲服務接口生成代理類,並讓代理類去調用 Invoker 邏輯。避免了 Dubbo 框架代碼對業務代碼的侵入,同時也讓框架更容易使用。apache
以上就是服務引用的大體原理,下面咱們深刻到代碼中,詳細分析服務引用細節。數組
服務引用的入口方法爲 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現了這個方法。實現代碼以下:緩存
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
// 檢測 ref 是否爲空,爲空則經過 init 方法建立
if (ref == null) {
// init 方法主要用於處理配置,以及調用 createProxy 生成代理類
init();
}
return ref;
}
複製代碼
以上兩個方法的代碼比較簡短,並不難理解。這裏須要特別說明一下,若是你對 2.6.4 及如下版本的 getObject 方法進行調試時,會碰到比較奇怪的的問題。這裏假設你使用 IDEA,且保持了 IDEA 的默認配置。當你面調試到 get 方法的if (ref == null)時,你會發現 ref 不爲空,致使你沒法進入到 init 方法中繼續調試。致使這個現象的緣由是 Dubbo 框架自己有一些小問題。該問題已經在 pull request #2754 修復了此問題,並跟隨 2.6.5 版本發佈了。若是你正在學習 2.6.4 及如下版本,可經過修改 IDEA 配置規避這個問題。首先 IDEA 配置彈窗中搜索 toString,而後取消Enable 'toString' object view勾選。具體以下:服務器
Dubbo 提供了豐富的配置,用於調整和優化框架行爲,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置到正確性。配置解析邏輯封裝在 ReferenceConfig 的 init 方法中,下面進行分析。架構
private void init() {
// 避免重複初始化
if (initialized) {
return;
}
initialized = true;
// 檢測接口名合法性
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("interface not allow null!");
}
// 檢測 consumer 變量是否爲空,爲空則建立
checkDefault();
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
// 設置 generic
setGeneric(getConsumer().getGeneric());
}
// 檢測是否爲泛化接口
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
// 加載類
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
}
// -------------------------------✨ 分割線1 ✨------------------------------
// 從系統變量中獲取與接口名對應的屬性值
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
// 從系統屬性中獲取解析文件路徑
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
// 從指定位置加載配置文件
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
// 獲取文件絕對路徑
resolveFile = userResolveFile.getAbsolutePath();
}
}
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
// 從文件中加載配置
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload ..., cause:...");
} finally {
try {
if (null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
// 獲取與接口名對應的配置
resolve = properties.getProperty(interfaceName);
}
}
if (resolve != null && resolve.length() > 0) {
// 將 resolve 賦值給 url
url = resolve;
}
// -------------------------------✨ 分割線2 ✨------------------------------
if (consumer != null) {
if (application == null) {
// 從 consumer 中獲取 Application 實例,下同
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// 檢測 Application 合法性
checkApplication();
// 檢測本地存根配置合法性
checkStubAndMock(interfaceClass);
// -------------------------------✨ 分割線3 ✨------------------------------
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
// 添加 side、協議版本信息、時間戳和進程號等信息到 map 中
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 非泛化服務
if (!isGeneric()) {
// 獲取版本
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 獲取接口方法列表,並添加到 map 中
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
// 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的字段信息添加到 map 中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
// -------------------------------✨ 分割線4 ✨------------------------------
String prefix = StringUtils.getServiceKey(map);
if (methods != null && !methods.isEmpty()) {
// 遍歷 MethodConfig 列表
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
// 檢測 map 是否包含 methodName.retry
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
// 添加劇試次數配置 methodName.retries
map.put(method.getName() + ".retries", "0");
}
}
// 添加 MethodConfig 中的「屬性」字段到 attributes
// 好比 onreturn、onthrow、oninvoke 等
appendAttributes(attributes, method, prefix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
// -------------------------------✨ 分割線5 ✨------------------------------
// 獲取服務消費者 ip 地址
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property..." );
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
// 存儲 attributes 到系統上下文中
StaticContext.getSystemContext().putAll(attributes);
// 建立代理類
ref = createProxy(map);
// 根據服務名,ReferenceConfig,代理類構建 ConsumerModel,
// 並將 ConsumerModel 存入到 ApplicationModel 中
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
複製代碼
上面的代碼很長,作的事情比較多。這裏根據代碼邏輯,對代碼進行了分塊,下面咱們一塊兒來看一下。併發
首先是方法開始到分割線1之間的代碼。這段代碼主要用於檢測 ConsumerConfig 實例是否存在,如不存在則建立一個新的實例,而後經過系統變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接着是檢測泛化配置,並根據配置設置 interfaceClass 的值。接着來看分割線1到分割線2之間的邏輯。這段邏輯用於從系統屬性或配置文件中加載與接口名相對應的配置,並將解析結果賦值給 url 字段。url 字段的做用通常是用於點對點調用。繼續向下看,分割線2和分割線3之間的代碼用於檢測幾個核心配置類是否爲空,爲空則嘗試從其餘配置類中獲取。分割線3與分割線4之間的代碼主要用於收集各類配置,並將配置存儲到 map 中。分割線4和分割線5之間的代碼用於處理 MethodConfig 實例。該實例包含了事件通知配置,好比 onreturn、onthrow、oninvoke 等。分割線5到方法結尾的代碼主要用於解析服務消費者 ip,以及調用 createProxy 建立代理對象。關於該方法的詳細分析,將會在接下來的章節中展開。app
本節咱們要從 createProxy 開始看起。從字面意思上來看,createProxy 彷佛只是用於建立代理對象的。但實際上並不是如此,該方法還會調用其餘方法構建以及合併 Invoker 實例。具體細節以下。框架
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
// url 配置被指定,則不作本地引用
if (url != null && url.length() > 0) {
isJvmRefer = false;
// 根據 url 的協議、scope 以及 injvm 等參數檢測是否須要本地引用
// 好比若是用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
// 獲取 injvm 配置值
isJvmRefer = isInjvm().booleanValue();
}
// 本地引用
if (isJvmRefer) {
// 生成本地引用 URL,協議爲 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 調用 refer 方法構建 InjvmInvoker 實例
invoker = refprotocol.refer(interfaceClass, url);
// 遠程引用
} else {
// url 不爲空,代表用戶可能想進行點對點調用
if (url != null && url.length() > 0) {
// 當須要配置多個 url 時,可用分號進行分割,這裏會進行切分
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
// 設置接口全限定名爲 url 路徑
url = url.setPath(interfaceName);
}
// 檢測 url 協議是否爲 registry,如果,代表用戶想使用指定的註冊中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 將 map 轉換爲查詢字符串,並做爲 refer 參數的值添加到 url 中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合併 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性),
// 好比線程池相關配置。並保留服務提供者的部分配置,好比版本,group,時間戳等
// 最後將合併後的配置設置爲 url 查詢字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 加載註冊中心 url
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 參數到 url 中,並將 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置註冊中心,拋出異常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference...");
}
}
// 單個註冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
// 調用 RegistryProtocol 的 refer 構建 Invoker 實例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個註冊中心或多個服務提供者,或者二者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取全部的 Invoker
for (URL url : urls) {
// 經過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時
// 根據 url 協議頭加載指定的 Protocol 實例,並調用實例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
// 若是註冊中心連接不爲空,則將使用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 建立 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true;
}
// invoker 可用性檢查
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("No provider available for the service...");
}
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
複製代碼
上面代碼不少,不過邏輯比較清晰。首先根據配置檢查是否爲本地調用,如果,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例。若不是,則讀取直連配置項,或註冊中心 url,並將讀取到的 url 存儲到 urls 中。而後根據 urls 元素數量進行後續操做。若 urls 元素數量爲1,則直接經過 Protocol 自適應拓展類構建 Invoker 實例接口。若 urls 元素數量大於1,即存在多個註冊中心或服務直連 url,此時先根據 url 構建 Invoker。而後再經過 Cluster 合併多個 Invoker,最後調用 ProxyFactory 生成代理類。Invoker 的構建過程以及代理類的過程比較重要,所以接下來將分兩小節對這兩個過程進行分析。jvm
Invoker 是 Dubbo 的核心模型,表明一個可執行體。在服務提供方,Invoker 用於調用服務提供類。在服務消費方,Invoker 用於執行遠程調用。Invoker 是由 Protocol 實現類構建而來。Protocol 實現類有不少,本節會分析最經常使用的兩個,分別是 RegistryProtocol 和 DubboProtocol,其餘的你們自行分析。下面先來分析 DubboProtocol 的 refer 方法源碼。以下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 建立 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
複製代碼
上面方法看起來比較簡單,不過這裏有一個調用須要咱們注意一下,即 getClients。這個方法用於獲取客戶端實例,實例類型爲 ExchangeClient。ExchangeClient 實際上並不具有通訊能力,它須要基於更底層的客戶端實例進行通訊。好比 NettyClient、MinaClient 等,默認狀況下,Dubbo 使用 NettyClient 進行通訊。接下來,咱們簡單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) {
// 是否共享鏈接
boolean service_share_connect = false;
// 獲取鏈接數,默認爲0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 若是未配置 connections,則共享鏈接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 獲取共享客戶端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客戶端
clients[i] = initClient(url);
}
}
return clients;
}
複製代碼
這裏根據 connections 數量決定是獲取共享客戶端仍是建立新的客戶端實例,默認狀況下,使用共享客戶端實例。getSharedClient 方法中也會調用 initClient 方法,所以下面咱們一塊兒看一下這兩個方法。
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
// 獲取帶有「引用計數」功能的 ExchangeClient
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
// 增長引用計數
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
// 建立 ExchangeClient 客戶端
ExchangeClient exchangeClient = initClient(url);
// 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這裏使用了裝飾模式
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}
複製代碼
上面方法先訪問緩存,若緩存未命中,則經過 initClient 方法建立新的 ExchangeClient 實例,並將該實例傳給 ReferenceCountExchangeClient 構造方法建立一個帶有引用計數功能的 ExchangeClient 實例。ReferenceCountExchangeClient 內部實現比較簡單,就不分析了。下面咱們再來看一下 initClient 方法的代碼。
private ExchangeClient initClient(URL url) {
// 獲取客戶端類型,默認爲 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 添加編解碼和心跳包參數到 url 中
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 檢測客戶端類型是否存在,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: ...");
}
ExchangeClient client;
try {
// 獲取 lazy 配置,並根據配置值決定建立的客戶端類型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 建立懶加載 ExchangeClient 實例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 建立普通 ExchangeClient 實例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service...");
}
return client;
}
複製代碼
initClient 方法首先獲取用戶配置的客戶端類型,默認爲 netty。而後檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最後根據 lazy 配置決定建立什麼類型的客戶端。這裏的 LazyConnectExchangeClient 代碼並非很複雜,該類會在 request 方法被調用時經過 Exchangers 的 connect 方法建立 ExchangeClient 客戶端,該類的代碼本節就不分析了。下面咱們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger 實例,默認爲 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
複製代碼
如上,getExchanger 會經過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,你們本身看一下吧。接下來分析 HeaderExchangeClient 的實現。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 這裏包含了多個調用,分別以下:
// 1. 建立 HeaderExchangeHandler 對象
// 2. 建立 DecodeHandler 對象
// 3. 經過 Transporters 構建 Client 實例
// 4. 建立 HeaderExchangeClient 對象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
複製代碼
這裏的調用比較多,咱們這裏重點看一下 Transporters 的 connect 方法。以下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 若是 handler 數量大於1,則建立一個 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例
return getTransporter().connect(url, handler);
}
複製代碼
如上,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的 Transporter 實現類。若用戶未配置客戶端類型,則默認加載 NettyTransporter,並調用該類的 connect 方法。以下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 建立 NettyClient 對象
return new NettyClient(url, listener);
}
複製代碼
到這裏就不繼續跟下去了,在往下就是經過 Netty 提供的 API 構建 Netty 客戶端了,你們有興趣本身看看。到這裏,關於 DubboProtocol 的 refer 方法就分析完了。接下來,繼續分析 RegistryProtocol 的 refer 方法邏輯。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 參數值,並將其設置爲協議頭
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 獲取註冊中心實例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 將 url 查詢字符串轉爲 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 獲取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 經過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 調用 doRefer 繼續執行服務引用邏輯
return doRefer(cluster, registry, type, url);
}
複製代碼
上面代碼首先爲 url 設置協議頭,而後根據 url 參數加載註冊中心實例。而後獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。這裏的重點是 doRefer 方法,以下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 建立 RegistryDirectory 實例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設置註冊中心和協議
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服務消費者連接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 註冊服務消費者,在 consumers 目錄下新節點
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 訂閱 providers、configurators、routers 等節點數據
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一個註冊中心可能有多個服務提供者,所以這裏須要將多個服務提供者合併爲一個
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
複製代碼
如上,doRefer 方法建立一個 RegistryDirectory 實例,而後生成服務者消費者連接,並向註冊中心進行註冊。註冊完畢後,緊接着訂閱 providers、configurators、routers 等節點下的數據。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點信息。因爲一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節點,這個時候就須要 Cluster 將多個服務節點合併爲一個,並生成一個 Invoker。關於 RegistryDirectory 和 Cluster,本文不打算進行分析,相關分析將會在隨後的文章中展開。
Invoker 建立完畢後,接下來要作的事情是爲服務接口生成代理對象。有了代理對象,便可進行遠程調用。代理對象生成的入口方法爲 ProxyFactory 的 getProxy,接下來進行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 調用重載方法
return getProxy(invoker, false);
}
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 獲取接口列表
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 切分接口列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 設置服務接口類和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 加載接口類
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
// 爲 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 建立新的 interfaces 數組
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 設置 GenericService.class 到數組中
interfaces[len] = GenericService.class;
}
// 調用重載方法
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
複製代碼
如上,上面大段代碼都是用來獲取 interfaces 數組的,咱們繼續往下看。getProxy(Invoker, Class<?>[]) 這個方法是一個抽象方法,下面咱們到 JavassistProxyFactory 類中看一下該方法的實現代碼。
public T getProxy(Invoker invoker, Class<?>[] interfaces) { // 生成 Proxy 子類(Proxy 是抽象類)。並調用 Proxy 子類的 newInstance 方法建立 Proxy 實例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } 上面代碼並很少,首先是經過 Proxy 的 getProxy 方法獲取 Proxy 子類,而後建立 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現自 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。該類邏輯比較簡單,這裏就不分析了。下面咱們重點關注一下 Proxy 的 getProxy 方法,以下。
public static Proxy getProxy(Class<?>... ics) {
// 調用重載方法
return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");
StringBuilder sb = new StringBuilder();
// 遍歷接口列表
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
// 檢測類型是否爲接口
if (!ics[i].isInterface())
throw new RuntimeException(itf + " is not a interface.");
Class<?> tmp = null;
try {
// 從新加載接口類
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
// 檢測接口是否相同,這裏 tmp 有可能爲空
if (tmp != ics[i])
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
// 拼接接口全限定名,分隔符爲 ;
sb.append(itf).append(';');
}
// 使用拼接後的接口名做爲 key
String key = sb.toString();
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}
Proxy proxy = null;
synchronized (cache) {
do {
// 從緩存中獲取 Reference<Proxy> 實例
Object value = cache.get(key);
if (value instanceof Reference<?>) {
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null) {
return proxy;
}
}
// 併發控制,保證只有一個線程能夠進行後續操做
if (value == PendingGenerationMarker) {
try {
// 其餘線程在此處進行等待
cache.wait();
} catch (InterruptedException e) {
}
} else {
// 放置標誌位到緩存中,並跳出 while 循環進行後續操做
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
// 建立 ClassGenerator 對象
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
for (int i = 0; i < ics.length; i++) {
// 檢測接口訪問級別是否爲 protected 或 privete
if (!Modifier.isPublic(ics[i].getModifiers())) {
// 獲取接口包名
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
// 非 public 級別的接口必須在同一個包下,否者拋出異常
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
// 添加接口到 ClassGenerator 中
ccp.addInterface(ics[i]);
// 遍歷接口方法
for (Method method : ics[i].getMethods()) {
// 獲取方法描述,可理解爲方法簽名
String desc = ReflectUtils.getDesc(method);
// 若是方法描述字符串已在 worked 中,則忽略。考慮這種狀況,
// A 接口和 B 接口中包含一個徹底相同的方法
if (worked.contains(desc))
continue;
worked.add(desc);
int ix = methods.size();
// 獲取方法返回值類型
Class<?> rt = method.getReturnType();
// 獲取參數列表
Class<?>[] pts = method.getParameterTypes();
// 生成 Object[] args = new Object[1...N]
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
// 生成 args[1...N] = ($w)$1...N;
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
// 生成 InvokerHandler 接口的 invoker 方法調用語句,以下:
// Object ret = handler.invoke(this, methods[1...N], args);
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
// 返回值不爲 void
if (!Void.TYPE.equals(rt))
// 生成返回語句,形如 return (java.lang.String) ret;
code.append(" return ").append(asArgument(rt, "ret")).append(";");
methods.add(method);
// 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}
if (pkg == null)
pkg = PACKAGE_NAME;
// 構建接口代理類名稱:pkg + ".proxy" + id,好比 org.apache.dubbo.proxy0
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
// 生成 private java.lang.reflect.InvocationHandler handler;
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
// 爲接口代理類添加帶有 InvocationHandler 參數的構造方法,好比:
// porxy0(java.lang.reflect.InvocationHandler arg0) {
// handler=$1;
// }
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
// 爲接口代理類添加默認構造方法
ccp.addDefaultConstructor();
// 生成接口代理類
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// 構建 Proxy 子類名稱,好比 Proxy1,Proxy2 等
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
// 爲 Proxy 的抽象方法 newInstance 生成實現代碼,形如:
// public Object newInstance(java.lang.reflect.InvocationHandler h) {
// return new org.apache.dubbo.proxy0($1);
// }
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
// 生成 Proxy 實現類
Class<?> pc = ccm.toClass();
// 經過反射建立 Proxy 實例
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
if (ccp != null)
// 釋放資源
ccp.release();
if (ccm != null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
// 寫緩存
cache.put(key, new WeakReference<Proxy>(proxy));
// 喚醒其餘等待線程
cache.notifyAll();
}
}
return proxy;
}
複製代碼
上面代碼比較複雜,咱們寫了大量的註釋。你們在閱讀這段代碼時,要搞清楚 ccp 和 ccm 的用途,否則會被搞暈。ccp 用於爲服務接口生成代理類,好比咱們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用於爲 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 類的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個接口爲例,來看一下該接口代理類代碼大體是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
複製代碼
好了,到這裏代理類生成邏輯就分析完了。整個過程比較複雜,你們須要耐心看一下。
本篇文章對服務引用的過程進行了較爲詳盡的分析,還有一些邏輯暫時沒有分析到,好比 Directory、Cluster。這些接口及實現類功能比較獨立,後續會單獨成文進行分析。暫時咱們能夠先把這些類當作黑盒,只要知道這些類的用途便可。關於服務引用過程就分析到這裏。
歡迎你們加入Java高級架構羣 378461078