String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
複製代碼
Step1:用 Map
存儲該協議的全部配置參數,包括:協議名稱、Dubbo版本、當前系統時間戳、進程ID、application配置、module配置、默認服務提供者參數(ProviderConfig)、協議配置、服務提供 Dubbo:service
的屬性。java
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
複製代碼
Step2:若是 dubbo:service
有 dubbo:method
子標籤,則 dubbo:method
以及其子標籤的配置屬性,都存入到 Map
中,屬性名稱加上對應的方法名做爲前綴。dubbo:method
的子標籤 dubbo:argument
,其鍵爲方法名.參數序號。redis
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
複製代碼
Step3:添加 methods
鍵值對,存放 dubbo:service
的全部方法名,多個方法名用 ,
隔開,若是是泛化實現,填充 genric=true,methods
爲 「*」
。spring
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
複製代碼
Step4:根據是否開啓令牌機制,若是開啓,設置 token
鍵,值爲靜態值或 uuid
。bootstrap
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
複製代碼
Step5:若是協議爲本地協議( injvm
),則設置 protocolConfig#register
屬性爲 false
,表示不向註冊中心註冊服務,在 map
中存儲鍵爲 notify
,值爲 false
,表示當註冊中心監聽到服務提供者發生變化(服務提供者增長、服務提供者減小等)事件時不通知。api
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
複製代碼
Step6:設置協議的 contextPath
,若是未配置,默認爲 /interfacename
。緩存
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
複製代碼
Step7:解析服務提供者的IP地址與端口。安全
private String findConfigedHosts(ProtocolConfig protocolConfig, List<URL> registryURLs, Map<String, String> map) {
boolean anyhost = false;
String hostToBind = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_BIND);
if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) {
throw new IllegalArgumentException("Specified invalid bind ip from property:" + Constants.DUBBO_IP_TO_BIND + ", value:" + hostToBind);
}
// if bind ip is not found in environment, keep looking up
if (hostToBind == null || hostToBind.length() == 0) {
hostToBind = protocolConfig.getHost();
if (provider != null && (hostToBind == null || hostToBind.length() == 0)) {
hostToBind = provider.getHost();
}
if (isInvalidLocalHost(hostToBind)) {
anyhost = true;
try {
hostToBind = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
if (isInvalidLocalHost(hostToBind)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
if (Constants.MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) {
// skip multicast registry since we cannot connect to it via Socket
continue;
}
try {
Socket socket = new Socket();
try {
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
hostToBind = socket.getLocalAddress().getHostAddress();
break;
} finally {
try {
socket.close();
} catch (Throwable e) {
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (isInvalidLocalHost(hostToBind)) {
hostToBind = getLocalHost();
}
}
}
}
map.put(Constants.BIND_IP_KEY, hostToBind);
// registry ip is not used for bind ip by default
String hostToRegistry = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
} else if (hostToRegistry == null || hostToRegistry.length() == 0) {
// bind ip is used as registry ip by default
hostToRegistry = hostToBind;
}
map.put(Constants.ANYHOST_KEY, String.valueOf(anyhost));
return hostToRegistry;
}
複製代碼
服務IP地址解析順序:(序號越小越優先)bash
- 系統環境變量,變量名:
DUBBO_DUBBO_IP_TO_BIND
- 系統屬性,變量名:
DUBBO_DUBBO_IP_TO_BIND
- 系統環境變量,變量名:
DUBBO_IP_TO_BIND
- 系統屬性,變量名:
DUBBO_IP_TO_BIND
dubbo:protocol
標籤的host
屬性 -->dubbo:provider
標籤的host
屬性- 默認網卡IP地址,經過
InetAddress.getLocalHost().getHostAddress()
獲取,若是IP地址不符合要求,繼續下一個匹配。// 判斷IP地址是否符合要求的標準 public static boolean isInvalidLocalHost(String host) { return host == null || host.length() == 0 || host.equalsIgnoreCase("localhost") || host.equals("0.0.0.0") || (LOCAL_IP_PATTERN.matcher(host).matches()); } 複製代碼
- 選擇第一個可用網卡,其實現方式是創建
socket
,鏈接註冊中心,獲取socket
的IP地址。Socket socket = new Socket(); try { SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); socket.connect(addr, 1000); hostToBind = socket.getLocalAddress().getHostAddress(); break; } finally { try { socket.close(); } catch (Throwable e) { } } 複製代碼
private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) {
Integer portToBind = null;
// parse bind port from environment
String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
portToBind = parsePort(port);
// if there's no bind port found from environment, keep looking up.
if (portToBind == null) {
portToBind = protocolConfig.getPort();
if (provider != null && (portToBind == null || portToBind == 0)) {
portToBind = provider.getPort();
}
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (portToBind == null || portToBind == 0) {
portToBind = defaultPort;
}
if (portToBind == null || portToBind <= 0) {
portToBind = getRandomPort(name);
if (portToBind == null || portToBind < 0) {
portToBind = getAvailablePort(defaultPort);
putRandomPort(name, portToBind);
}
logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
}
}
// save bind port, used as url's key later
map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));
// registry port, not used as bind port by default
String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
Integer portToRegistry = parsePort(portToRegistryStr);
if (portToRegistry == null) {
portToRegistry = portToBind;
}
return portToRegistry;
}
複製代碼
服務提供者端口解析順序:(序號越小越優先)服務器
- 系統環境變量,變量名:
DUBBO_DUBBO_PORT_TO_BIND
- 系統屬性,變量名:
DUBBO_DUBBO_PORT_TO_BIND
- 系統環境變量,變量名:
DUBBO_PORT_TO_BIND
- 系統屬性,變量名:
DUBBO_PORT_TO_BIND
dubbo:protocol
標籤port
屬性 -->dubbo:provider
標籤的port
屬性。- 隨機選擇一個端口。
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
複製代碼
Step8:根據協議名稱、協議 host
、協議端口、contextPath
、相關配置屬性(application
、module
、provider
、protocolConfig
、service
及其子標籤)構建服務提供者URI。網絡
URL運行效果圖,以下:
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // @ 代碼1
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { // @ 代碼2
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) { // @ 代碼3
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // @ 代碼4
URL monitorUrl = loadMonitor(registryURL); // @ 代碼5
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代碼6
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker); // 代碼7
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
複製代碼
Step9:獲取 dubbo:service
標籤的 scope
屬性,其可選值爲 none
(不暴露)、local
(本地)、remote
(遠程),若是配置爲 none
,則不暴露。默認爲 local
。
Step10:根據 scope
來暴露服務,若是 scope
不配置,則默認本地與遠程都會暴露,若是配置成 local
或 remote
,那就只能是二選一。
代碼1:若是
scope
不爲remote
,則先在本地暴露(injvm
),具體暴露服務的具體實現,將在remote 模式中詳細分析。代碼2:若是
scope
不爲local
,則將服務暴露在遠程。代碼3:
remote
方式,檢測當前配置的全部註冊中心,若是註冊中心不爲空,則遍歷註冊中心,將服務依次在不一樣的註冊中心進行註冊。代碼4:若是
dubbo:service
的dynamic
屬性未配置, 嘗試取dubbo:registry
的dynamic
屬性,該屬性的做用是否啓用動態註冊,若是設置爲false
,服務註冊後,其狀態顯示爲disable
,須要人工啓用,當服務不可用時,也不會自動移除,一樣須要人工處理,此屬性不要在生產環境上配置。代碼5:根據註冊中心URL,構建監控中心的URL,若是監控中心URL不爲空,則在服務提供者URL上追加
monitor
,其值爲監控中心URL(已編碼)。1)若是dubbo spring xml配置文件中沒有配置監控中心(dubbo:monitor),就從系統屬性-Ddubbo.monitor.address,-Ddubbo.monitor.protocol構建MonitorConfig對象,不然從dubbo的properties配置文件中尋找這個兩個參數,若是沒有配置,則返回null。 2)若是有配置,則追加相關參數,dubbo:monitor標籤只有兩個屬性:address、protocol,其次會追加interface(MonitorService)、協議等。 複製代碼
代碼6:經過動態代理機制建立
Invoker
,Dubbo的遠程調用實現類。
Dubbo遠程調用器如何構建,這裏不詳細深刻,重點關注WrapperInvoker的url爲:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6328%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527255510215
&pid=6328
&qos.port=22222
®istry=zookeeper
×tamp=1527255510202
複製代碼
這裏有兩個重點值得關注:
- path屬性:
com.alibaba.dubbo.registry.RegistryService
,註冊中心也相似於服務提供者。- export屬性:值爲服務提供者的URL,爲何須要關注這個URL呢?請看代碼7,
protocol
屬性爲Protocol$Adaptive
,Dubbo在加載組件實現類時採用SPI(關於SPI細節,可參閱《☆聊聊Dubbo(五):核心源碼-SPI擴展》 ),在這裏咱們只須要知道,根據URL冒號以前的協議名將會調用相應的方法。
其映射關係(列出與服務啓動相關協議實現類):
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol // 文件位於dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol // 文件位於dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
複製代碼
代碼7:根據代碼6的分析,將調用
RegistryProtocol#export
方法。
這裏很重要的是 Invoker
實例,做爲Dubbo的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起invoke調用,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現。
因此,下面重點分析 代碼6 & 代碼7
兩處代碼實現,源碼以下:
// 使用ProxyFactory將服務實現封裝成一個Invoker對象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代碼6
// 根據指定協議本地暴露和向註冊中心註冊服務
Exporter<?> exporter = protocol.export(invoker); // @ 代碼7
//用於unexport
exporters.add(exporter);
複製代碼
上面 proxyFactory
和 protocol
兩個變量,具體定義以下:
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
複製代碼
Invoker
實例從 proxyFactory
獲取,而 proxyFactory
在這裏實際是個適配器,經過調用 getAdaptiveExtension()
方法,會以拼接源碼的方式動態生成目標ProxyFactory Class,生成的Class方法中會獲取 url
中的參數來構建合適的具體實現對象,若是 url
中未配置,則使用 @SPI
配置的默認值。
查看 ProxyFactory
和 Protocol
接口,默認 ProxyFactory
實現爲 JavassistProxyFactory
,默認 Protocol
實現爲 DubboProtocol
。源碼以下:
// 默認javassist
@SPI("javassist")
public interface ProxyFactory {
...
}
// 默認dubbo
@SPI("dubbo")
public interface Protocol {
...
}
複製代碼
ExtensionLoader#getAdaptiveExtension()
調用棧,以下:
ExtensionLoader<T>.getAdaptiveExtension()
ExtensionLoader<T>.createAdaptiveExtension()
ExtensionLoader<T>.getAdaptiveExtensionClass()
ExtensionLoader<T>.createAdaptiveExtensionClass()
ExtensionLoader<T>.createAdaptiveExtensionClassCode()
複製代碼
最終,生成目標ProxyFactory Class,源碼以下:
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
複製代碼
能夠看到,在上面的 getInvoker
方法中,會優先獲取 proxy
擴展,不然默認獲取 javassist
擴展。通常狀況下,咱們未主動擴展配置代理工廠的話,使用 JavassistProxyFactory
,源碼以下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper類不能正確處理帶$的類名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
複製代碼
注意到這裏的入參包括 proxy
服務實例和其接口類型,由於須要對服務進行代理封裝,最終是生成一個 AbstractProxyInvoker
實例,其 doInvoker
方法成爲服務調用的入口。如下是具體的封裝過程:
public static Wrapper getWrapper(Class<?> c) {
while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
c = c.getSuperclass();
if( c == Object.class )
return OBJECT_WRAPPER;
Wrapper ret = WRAPPER_MAP.get(c);
if( ret == null )
{
ret = makeWrapper(c);
WRAPPER_MAP.put(c,ret);
}
return ret;
}
複製代碼
具體的 makeWrapper
方法是利用 javassist
技術動態構造 Wapper
類型並建立實例,源碼較長這裏再也不列出,如下是 Wapper
類型的 invokeMethod
方法源碼(注意是 javasssit
語法形式):
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
indi.cesc.inno.learn.dubbo.HelloService w;
try{
w = ((indi.cesc.inno.learn.dubbo.HelloService)$1);
}catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "sayHello".equals( $2 ) && $3.length == 1 ) {
return ($w)w.sayHello((indi.cesc.inno.learn.dubbo.HelloRequest)$4[0]); // 真實方法調用
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class indi.cesc.inno.learn.dubbo.HelloService.");
}
複製代碼
能夠看到 w.sayHello()
這就是直接經過服務的實現對象調用具體方法,並非經過反射,效率會高些。默認使用Javassist而不是JDK動態代理也是出於效率的考慮。
這裏就將真實服務加入到總體調用鏈條之中,後續再將 Invoker
往上層傳遞,打通整個鏈條。
繼續上面 代碼7
處的代碼,protocol
實例調用 export
方法進入後續流程。這裏的 protocol
類型實際依舊是個適配器,export
方法源碼以下:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
複製代碼
注意 invoker
的 url
不是服務暴露的 url
,而是協議註冊的 url
,所以 url
裏面的協議是 registry
。嘗試獲取名爲 registry
的 Protocol
擴展,但進入 ExtensionLoader
後被攔截,實際拿到了其封裝類 ProtocolFilterWrapper
,其負責組裝過濾器鏈。
在/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
中配置有:
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
複製代碼
則獲取 RegistryProtocol
實例會被 ProtocolFilterWrapper
和 ProtocolListenerWrapper
裝飾,分別用來實現攔截器和監聽器功能,查看這兩個Wrapper的代碼能夠看出,對於註冊url都作了特別處理,向註冊中心發佈url不會觸發攔截器和監聽器功能,只有在真正暴露服務時纔會註冊攔截器,觸發監聽器。
ProtocolFilterWrapper#export
方法,源碼以下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 此處,將直接進入 RegistryProtocol 的 export 方法
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
複製代碼
此處,將直接進入 RegistryProtocol
的 export
方法。
依據上面分析,最終註冊發佈服務調用鏈:ServiceBean#afterPropertiesSet —> ServiceConfig#export —> ServiceConfig#doExport —> ServiceConfig#doExportUrlsFor1Protocol —> RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // @ 代碼1
URL registryUrl = getRegistryUrl(originInvoker); // @ 代碼2
//registry provider
final Registry registry = getRegistry(originInvoker); // @ 代碼3
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // @ 代碼4 start
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl); // @ 代碼4 end
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // @ 代碼5 start
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @ 代碼5 end
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
複製代碼
代碼1:啓動服務提供者服務,監聽指定端口,準備服務消費者的請求,這裏其實就是從
WrapperInvoker
中的url
(註冊中心url
)中提取export
屬性,描述服務提供者的url
,而後啓動服務提供者。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
複製代碼
從上圖中,能夠看出,將調用 DubboProtocol#export
完成Dubbo服務的啓動,利用netty構建一個微型服務端,監聽端口,準備接受服務消費者的網絡請求,而後將 dubbo:service
的服務handler加入到命令處理器中,當有消息消費者鏈接該端口時,經過網絡解包,將須要調用的服務和參數等信息解析處理後,轉交給對應的服務實現類處理便可。
代碼2:獲取真實註冊中心的URL,例如:zookeeper註冊中心的URL。
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D10252%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527263060882
&pid=10252
&qos.port=22222
×tamp=1527263060867
複製代碼
代碼3:根據註冊中心URL,從註冊中心工廠中獲取指定的註冊中心實現類:zookeeper註冊中心的實現類爲:
ZookeeperRegistry
。代碼4:獲取服務提供者URL中的
register
屬性,若是爲true
,則調用註冊中心的ZookeeperRegistry#register
方法向註冊中心註冊服務(實際由其父類FailbackRegistry
實現)。
RegistryProtocol#register
方法,源碼以下:
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
複製代碼
FailbackRegistry#register
方法,源碼以下:
@Override
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
failedRegistered.add(url);
}
}
複製代碼
ZookeeperRegistry#doRegister
方法,源碼以下:
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
複製代碼
代碼5:服務提供者向註冊中心訂閱本身,主要是爲了服務提供者URL發生變化的時候,會觸發
overrideSubscribeListener
的notify
方法從新暴露服務。固然,會將dubbo:reference
的check
屬性設置爲false
。
爲了感知註冊中心的一些配置變化,提供者會監聽註冊中心路徑 /dubbo/${interfaceClass}/configurators
的節點,監聽該節點在註冊中心的一些配置信息變動。Zookeeper註冊中心經過zookeeper框架的監聽回調接口進行監聽(redis註冊中心經過訂閱命令(subscribe)監聽),服務器緩存註冊中心的配置,當配置發生變動時,服務會刷新本地緩存。
FailbackRegistry#subscribe
訂閱方法,源碼以下:
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
複製代碼
ZookeeperRegistry#doSubscribe
訂閱方法,源碼以下:
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
複製代碼
本節是切實最最核心的,重點關注 RegistryProtocol#export
中調用 doLocalExport
方法,其實主要是 根據各自協議,服務提供者創建網絡服務器,在特定端口創建監聽,監聽來自消息消費端服務的請求。
RegistryProtocol#doLocalExport,源碼以下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // @ 代碼1
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // @ 代碼2
bounds.put(key, exporter);
}
}
}
return exporter;
}
複製代碼
代碼1:若是服務提供者以
dubbo
協議暴露服務,getProviderUrl(originInvoker)返回的URL將以dubbo://
開頭。代碼2:根據Dubbo內置的SPI機制,將調用
DubboProtocol#export
方法。
DubboProtocol#export,源碼以下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl(); // @ 代碼1
// export service.
String key = serviceKey(url); // @ 代碼2
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); // @ 代碼3 start
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
} // @ 代碼3 end
openServer(url); // @ 代碼4
optimizeSerialization(url); // @ 代碼5
return exporter;
}
複製代碼
代碼1:獲取服務提供者URL,以協議名稱,這裏是
dubbo://
開頭。代碼2:從服務提供者URL中獲取服務名,
key: interface:port
,例如:com.alibaba.dubbo.demo.DemoService:20880
。代碼3:是否將轉發事件導出成
stub
。代碼4:根據url打開服務。
代碼5:根據url優化器序列化方式。
DubboProtocol#openServer,源碼以下:
private void openServer(URL url) {
// find server.
String key = url.getAddress(); // @ 代碼1
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key); // @ 代碼2
if (server == null) {
serverMap.put(key, createServer(url)); // @ 代碼3
}
}
} else {
// server supports reset, use together with override
server.reset(url); // @代碼4
}
}
}
複製代碼
代碼1:根據url獲取網絡地址:
ip:port
,例如:192.168.56.1:20880
,服務提供者IP與暴露服務端口號。代碼2:根據key從服務器緩存中獲取,若是存在,則執行代碼4,若是不存在,則執行代碼3.
代碼3:根據URL建立一服務器,Dubbo服務提供者服務器實現類爲
ExchangeServer
。代碼4:若是服務器已經存在,用當前URL重置服務器,這個不難理解,由於一個Dubbo服務中,會存在多個
dubbo:service
標籤,這些標籤都會在服務檯提供者的同一個IP地址、端口號上暴露服務。
DubboProtocol#createServer,源碼以下:
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @ 代碼1
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @ 代碼2
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @ 代碼3
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @ 代碼4
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @ 代碼5
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // @ 代碼6
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY); // @ 代碼7
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
複製代碼
代碼1:爲服務提供者url增長
channel.readonly.sent
屬性,默認爲true
,表示在發送請求時,是否等待將字節寫入socket後再返回,默認爲true
。代碼2:爲服務提供者url增長
heartbeat
屬性,表示心跳間隔時間,默認爲60*1000
,表示60s。代碼3:爲服務提供者url增長
server
屬性,可選值爲netty,mina
等等,默認爲netty
。代碼4:根據SPI機制,判斷
server
屬性是否支持。代碼5:爲服務提供者url增長
codec
屬性,默認值爲dubbo
,協議編碼方式。代碼6:根據服務提供者URI,服務提供者命令請求處理器
requestHandler
構建ExchangeServer
實例。requestHandler
的實現具體在之後詳細分析Dubbo服務調用時再詳細分析。代碼7:驗證客戶端類型是否可用。
Exchangers#bind方法,根據 URL
、ExchangeHandler
構建服務器,源碼以下:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
複製代碼
上述代碼不難看出,首先根據 url
獲取 Exchanger
實例,而後調用 bind
方法構建 ExchangeServer
,Exchanger
接口方法以下:
ExchangeServer bind(URL url, ExchangeHandler handler)
:服務提供者調用。ExchangeClient connect(URL url, ExchangeHandler handler)
:服務消費者調用。
Dubbo提供的實現類爲:HeaderExchanger
,其 bind
方法以下:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
複製代碼
今後處能夠看到,端口的綁定由 Transporters
的 bind
方法實現。源碼以下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
複製代碼
從這裏得知,Dubbo網絡傳輸的接口有 Transporter
接口實現,其繼承類圖所示:
本文以netty版原本查看一下 Transporter
實現。 NettyTransporter
源碼以下:
public class NettyTransporter implements Transporter {
public static final String NAME = "netty3";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
複製代碼
建立 NettyServer
實例時,其父類構造函數會調用 doOpen()
創建網絡鏈接,源碼以下:
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @ 代碼1
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler); // @ 代碼2
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
複製代碼
從本方法 代碼1 & 代碼2
瞭解,首先建立 NettyServer
必須傳入一個服務提供者 URL
,但從 DubboProtocol#createServer
中能夠看出,Server是基於網絡套接字 (ip:port)
緩存的,一個JVM應用中,必然會存在多個 dubbo:service
標籤,就會有多個 URL
,這裏爲何能夠這樣作呢?
從 DubboProtocol#createServer
中能夠看出,在解析第二個 dubbo:service
標籤時並不會調用 createServer
,而是會調用 Server#reset
方法,是否是這個方法有什麼魔法,在reset方法時能將URL也註冊到Server上?
那接下來分析 NettyServer#reset
方法是如何實現的?DubboProtocol#reset
方法最終將調用 Server
的 reset
方法,一樣仍是以netty版本的 NettyServer
爲例,查看reset方法的實現原理。 NettyServer#reset—>父類(AbstractServer) AbstractServer#reset
,源碼以下:
@Override
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(Constants.ACCEPTS_KEY)) {
int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { // @ 代碼1 start
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(Constants.THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
} // @ 代碼1 end
super.setUrl(getUrl().addParameters(url.getParameters())); // @ 代碼2
}
複製代碼
代碼1:首先是調整線程池的相關線程數量,這個好理解。
代碼2:而後設置調用
setUrl
覆蓋原先NettyServer
的private volatile URL url
的屬性,那爲何不會影響原先註冊的dubbo:service
呢?原來NettyHandler
上加了註解:@Sharable
,由該註解去實現線程安全。