public interface RegistryFactory { Registry getRegistry(URL url); }
核心方法就這一個。算法
public abstract class AbstractRegistryFactory implements RegistryFactory { // Log output private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class); // The lock for the acquisition process of the registry private static final ReentrantLock LOCK = new ReentrantLock(); // Registry Collection Map<RegistryAddress, Registry> private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>(); public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } } public static void destroyAll() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Close all registries " + getRegistries()); } // Lock up the registry shutdown process LOCK.lock(); try { for (Registry registry : getRegistries()) { try { registry.destroy(); } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } REGISTRIES.clear(); } finally { // Release the lock LOCK.unlock(); } } protected abstract Registry createRegistry(URL url);
能夠發現註冊集合的操做經過ReentrantLock加鎖實現,createRegistry註冊實現則是交給對應實現方本身實現。spring
public Registry createRegistry(URL url) { url = getRegistryURL(url); List<URL> urls = new ArrayList<URL>(); urls.add(url.removeParameter(Constants.BACKUP_KEY)); String backup = url.getParameter(Constants.BACKUP_KEY); if (backup != null && backup.length() > 0) { String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup); for (String address : addresses) { urls.add(url.setAddress(address)); } } RegistryDirectory<RegistryService> directory = new RegistryDirectory<RegistryService>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString())); Invoker<RegistryService> registryInvoker = cluster.join(directory); RegistryService registryService = proxyFactory.getProxy(registryInvoker); DubboRegistry registry = new DubboRegistry(registryInvoker, registryService); directory.setRegistry(registry); directory.setProtocol(protocol); directory.notify(urls); directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters())); return registry; }
將全部url和backupurl所有放入集合,經過負載均衡算法進行使用。app
public abstract class FailbackRegistry extends AbstractRegistry { // Scheduled executor service private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry private final ScheduledFuture<?> retryFuture; private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); private AtomicBoolean destroyed = new AtomicBoolean(false);
能夠發現註冊中心的容錯,是經過定時線程對註冊url進行重試,將可用url和不可用url放入不一樣集合。負載均衡
<dubbo:application name="${dubbo.application.name}" owner="${dubbo.application.owner}"/> <dubbo:protocol name="dubbo" port="${dubbo.protocol.port}" heartbeat="180000"/> <dubbo:service id="registryServiceConfig" interface="com.alibaba.dubbo.registry.RegistryService" ref="registryService" registry="N/A" ondisconnect="disconnect" callbacks="1000"> <dubbo:method name="subscribe"> <dubbo:argument index="1" callback="true"/> </dubbo:method> <dubbo:method name="unsubscribe"> <dubbo:argument index="1" callback="false"/> </dubbo:method> </dubbo:service> <bean id="registryService" class="com.alibaba.dubbo.registry.simple.SimpleRegistryService"/> public void register(URL url) { String client = RpcContext.getContext().getRemoteAddressString(); Set<URL> urls = remoteRegistered.get(client); if (urls == null) { remoteRegistered.putIfAbsent(client, new ConcurrentHashSet<URL>()); urls = remoteRegistered.get(client); } urls.add(url); super.register(url); registered(url); }
以前咱們說過,在spring容器啓動以後,整個Bean信息會被解析成多個Config和Model的Bean放入Context上下文中,因此在註冊時,會經過Context獲取上下文中註冊的Mode進行Bean解析註冊。ui