先看下consumer端發起調用時的鏈路流程:html
+---------------------------+ +---------------------------+ +---------------------------+ | helloService | | proxy | | InvokerInvocationHandler | | sayHello +----------> | sayHello +----------> | invoke | | | | | | proxy method args | +---------------------------+ +---------------------------+ +-------------+-------------+ | | +---------------------------------+ | | | | +------------v--------------+ | | | MockClusterInvoker | | | | invoke | | | | | | | +------------+--------------+ | | | | | | | | | | +---------------------------+ +---------------------------+ | +------------v--------------+ | | Router | | RegistryDirectory | | | FailoverClusterInvoker | | | route | <----------+ list | <-----------+ invoke | | | MockInVokersSelector | | INVOCATION-->List INVOKER | | | | | +------------+--------------+ +---------------------------+ | +---------------------------+ | | | | | +---------------------------------+ | cluster invoke,分佈式調用容錯機制也是在這作 | | | | | +-------------v-------------+ +---------------------------+ +---------------------------+ | RandomLoadBalance | |InvokerDelegate | | ListenerInvokerWrap | | select +-----------> |invoke +-----------> | invoke | | List INVOKER-->INVOKER | | | | | +---------------------------+ +---------------------------+ +---------------------------+
從創建spring到netty client創建鏈接的調用棧:
NettyClient.doOpen() line: 66
NettyClient(AbstractClient).
NettyClient.
NettyTransporter.connect(URL, ChannelHandler) line: 37
Transporter$Adpative.connect(URL, ChannelHandler) line: not available
Transporters.connect(URL, ChannelHandler...) line: 67
HeaderExchanger.connect(URL, ExchangeHandler) line: 37
Exchangers.connect(URL, ExchangeHandler) line: 102
DubboProtocol.initClient(URL) line: 378
DubboProtocol.getSharedClient(URL) line: 344
DubboProtocol.getClients(URL) line: 321
DubboProtocol.refer(Class
ProtocolListenerWrapper.refer(Class
ProtocolFilterWrapper.refer(Class
Protocol$Adpative.refer(Class, URL) line: not available
RegistryDirectory
RegistryDirectory
RegistryDirectory
ZookeeperRegistry(AbstractRegistry).notify(URL, NotifyListener, List
ZookeeperRegistry(FailbackRegistry).doNotify(URL, NotifyListener, List
ZookeeperRegistry(FailbackRegistry).notify(URL, NotifyListener, List
ZookeeperRegistry.doSubscribe(URL, NotifyListener) line: 170
ZookeeperRegistry(FailbackRegistry).subscribe(URL, NotifyListener) line: 189
RegistryDirectory
RegistryProtocol.doRefer(Cluster, Registry, Class
RegistryProtocol.refer(Class
ProtocolListenerWrapper.refer(Class
ProtocolFilterWrapper.refer(Class
Protocol$Adpative.refer(Class, URL) line: not available
ReferenceBean
ReferenceBean
ReferenceBean
ReferenceBean
DefaultListableBeanFactory(FactoryBeanRegistrySupport).doGetObjectFromFactoryBean(FactoryBean, String, boolean) line: 142
總體來講: 先由註冊中心的協議處理器處理註冊中心的地址,找到全部provider的地址,建立全部invoker,而後再由invoker在真正調用時發起調用。
註冊中心的這個也抽象一種協議,由註冊中心結合提供者的協議推導出提供者的協議地址,也就是目標端的地址與端口得知了。
每個接口在zookeeper上都有節點,節點下面是provider,再下面是全部provider的具體地址。spring
建立channelPipelineFactory,並在這個factory中返回加工好的ChannelPipeline,加工過程包括加入編解碼器,鏈接事件處理組成的netty handler實現
(包括鏈接創建,斷開,請求寫出去,)app
writeRequested(netty的)-->調用編碼器(編碼的這個對象中包括了 須要調用的目標接口名 方法名等信息)
(繼承SimpleChannelHandler重寫邏輯,能夠定製channel的讀取與寫出邏輯)dom
provider停掉會觸發zk客戶端的監聽,監聽對客戶端的invoker列表進行刷新。
再次啓動會觸發 zk的監聽,代碼在ZkclientZookeeperClient
分佈式
public IZkChildListener createTargetChildListener(String path, final ChildListener listener) { return new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { listener.childChanged(parentPath, currentChilds); } }; }
而後再觸發 com.alibaba.dubbo.registry.support.FailbackRegistry.doNotify(URL, NotifyListener, List
com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(List
若是有provider停掉了 走同樣的監聽邏輯
同時,dubbo支持 定時檢查provider的狀態並進行重連,具體參見
com.alibaba.dubbo.remoting.transport.AbstractClient.initConnectStatusCheckCommand()
reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);this
若是在發服務時,provider停掉了,那麼此時會拋出異常,並在FailoverClusterInvoker doInvoke中捕獲,
FailoverClusterInvoker支持調用失敗時重試(可配置),此時達到再次重試的目的。編碼
NettyClient 的doOpen doConnect均在初始化的時候調用,有幾個provider就調用幾回,真正rpc調用服務的時候是不會再調用open與connect的。
上面這個說法不嚴格,由於看他發送消息的代碼就知道了,每次發消息時還會檢查下:url
public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()){ connect(); } Channel channel = getChannel(); //TODO getChannel返回的狀態是否包含null須要改進 if (channel == null || ! channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); }
com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.select(LoadBalance, Invocation, List<Invoker
com.alibaba.dubbo.rpc.filter.EchoFilter@1fd14d74 com.alibaba.dubbo.rpc.filter.ClassLoaderFilter@563e4951 com.alibaba.dubbo.rpc.filter.GenericFilter@4066c471 com.alibaba.dubbo.rpc.filter.ContextFilter@2b175c00 com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter@3eb81efb com.alibaba.dubbo.rpc.filter.TimeoutFilter@1ae8bcbc com.alibaba.dubbo.monitor.support.MonitorFilter@6cdba6dc com.alibaba.dubbo.rpc.filter.ExceptionFilter@2609b277