說在前面git
前期回顧github
sharding-jdbc源碼解析 更新完畢spring
spring源碼解析 更新完畢spring-mvc
spring-mvc源碼解析 更新完畢服務器
spring-tx源碼解析 更新完畢網絡
spring-boot源碼解析 更新完畢mvc
rocketmq源碼解析 更新完畢app
dubbo源碼解析 更新中負載均衡
sharding-sphere源碼解析 計劃中ide
netty源碼解析 計劃中
spring-cloud-alibaba-dubbo源碼解析 計劃中
github https://github.com/tianheframe
sharding-jdbc源碼解析 更新完畢
rocketmq 更新完畢
dubbo源碼解析 更新中
seata源碼解析 更新中
spring-cloud-tianhe 更新中
mq-tianhe 計劃中
rpc-tianhe 計劃中
源碼解析
https://github.com/tianheframe/dubbo.git dubbo源碼解析後的源碼和公號文章同步更新。
返回到這個方法,服務註冊信息恢復com.alibaba.dubbo.registry.support.FailbackRegistry#recover
@Overrideprotected void recover() throws Exception {// 獲取服務註冊的url集合Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());if (!recoverRegistered.isEmpty()) {if (logger.isInfoEnabled()) {logger.info("Recover register url " + recoverRegistered);}for (URL url : recoverRegistered) {// 保存註冊失敗的服務註冊urlfailedRegistered.add(url);}}// 獲取訂閱的服務url集合Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());if (!recoverSubscribed.isEmpty()) {if (logger.isInfoEnabled()) {logger.info("Recover subscribe url " + recoverSubscribed.keySet());}for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {URL url = entry.getKey();for (NotifyListener listener : entry.getValue()) {// 添加訂閱失敗的服務urladdFailedSubscribed(url, listener);}}}}
進入這個方法com.alibaba.dubbo.registry.support.FailbackRegistry#register服務註冊
@Overridepublic void register(URL url) {// 添加註冊服務url=》super.register(url);failedRegistered.remove(url);failedUnregistered.remove(url);try {// Sending a registration request to the server side 向服務器端發送註冊請求=》ZookeeperRegistrydoRegister(url);} catch (Exception e) {Throwable t = e;// If the startup detection is opened, the Exception is thrown directly. 若是打開啓動檢測,則直接拋出異常boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularly 將失敗的註冊請求記錄到失敗的列表中,按期重試failedRegistered.add(url);}}
進入這個方法com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister進行zk服務註冊,這裏的邏輯就是建立zk臨時節點
@Overrideprotected void doRegister(URL url) {try {// 服務註冊,建立zk節點,若是dynamic配置的是true,建立的就是zk臨時節點=》zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}}
這裏默認建立的是臨時節點,這也就是zk註冊的服務所在節點掛了以後其餘客戶端節點本地的服務列表會更新的緣由,不會調用到不存在的服務,固然也存在zk臨時節點刪除,通知其餘訂閱這個節點的客戶端時候出現網絡抖動,zk會作處理確保必定能通知到,這種中間處理也能要業務邏輯要作處理了
/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://172.28.84.147:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76579&side=provider×tamp=1569898563184 服務註冊的zk path是這樣的
若是註冊失敗的話怎麼辦呢,在建立com.alibaba.dubbo.registry.support.FailbackRegistry#FailbackRegistry對象的時候構造方法邏輯中,重試參數retry.period 默認值是每5秒鐘會作重試處理,這裏也能夠自定義修改
public FailbackRegistry(URL url) {super(url);// 每5秒中會重試註冊失敗的服務信息,能夠修改這個參數retry.periodthis.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {// Check and connect to the registrytry {retry();} catch (Throwable t) { // Defensive fault tolerancelogger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);}}}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);}
進入這個方法服務註冊失敗重試邏輯,com.alibaba.dubbo.registry.support.FailbackRegistry#retry
protected void retry() {if (!failedRegistered.isEmpty()) {Set<URL> failed = new HashSet<URL>(failedRegistered);if (failed.size() > 0) {if (logger.isInfoEnabled()) {logger.info("Retry register " + failed);}try {for (URL url : failed) {try {// zk服務註冊doRegister(url);failedRegistered.remove(url);} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry 這裏異常不作處理等待下次重試logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}if (!failedUnregistered.isEmpty()) {Set<URL> failed = new HashSet<URL>(failedUnregistered);if (!failed.isEmpty()) {if (logger.isInfoEnabled()) {logger.info("Retry unregister " + failed);}try {for (URL url : failed) {try {// 取消服務註冊失敗重試doUnregister(url);failedUnregistered.remove(url);} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}if (!failedSubscribed.isEmpty()) {Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {if (entry.getValue() == null || entry.getValue().size() == 0) {failed.remove(entry.getKey());}}if (failed.size() > 0) {if (logger.isInfoEnabled()) {logger.info("Retry subscribe " + failed);}try {for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {URL url = entry.getKey();Set<NotifyListener> listeners = entry.getValue();for (NotifyListener listener : listeners) {try {// 服務訂閱失敗的進行從新訂閱doSubscribe(url, listener);listeners.remove(listener);} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}if (!failedUnsubscribed.isEmpty()) {Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {if (entry.getValue() == null || entry.getValue().isEmpty()) {failed.remove(entry.getKey());}}if (failed.size() > 0) {if (logger.isInfoEnabled()) {logger.info("Retry unsubscribe " + failed);}try {for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {URL url = entry.getKey();Set<NotifyListener> listeners = entry.getValue();for (NotifyListener listener : listeners) {try {// 取消訂閱doUnsubscribe(url, listener);listeners.remove(listener);} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}if (!failedNotified.isEmpty()) {Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {if (entry.getValue() == null || entry.getValue().size() == 0) {failed.remove(entry.getKey());}}if (failed.size() > 0) {if (logger.isInfoEnabled()) {logger.info("Retry notify " + failed);}try {for (Map<NotifyListener, List<URL>> values : failed.values()) {for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {try {NotifyListener listener = entry.getKey();List<URL> urls = entry.getValue();listener.notify(urls);values.remove(listener);} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}} catch (Throwable t) { // Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);}}}}
com.alibaba.dubbo.registry.support.FailbackRegistry#failedRegistered對服務註冊失敗的從新註冊
對服務取消註冊失敗的進行從新取消服務註冊com.alibaba.dubbo.registry.support.FailbackRegistry#failedUnregistered,進入這個方法com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister,這裏的操做就是刪除刪除zk臨時節點,刪除zk臨時節點後其餘訂閱服務的服務節點會收到zk的監聽器從新刷新已經生成的代理invoker對象,客戶端在進行負載均衡的時候是直接路由到具體的invoker
@Overrideprotected void doUnregister(URL url) {try {zkClient.delete(toUrlPath(url));} catch (Throwable e) {throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}}
訂閱失敗的從新訂閱com.alibaba.dubbo.registry.support.FailbackRegistry#failedSubscribed
下篇接着介紹服務訂閱失敗的怎麼進行從新服務訂閱
說在最後
本次解析僅表明我的觀點,僅供參考。