咱們在Nacos - NacosNamingService初始化提過,NacosNamingService對象建立的時候,會建立一個EventDispatcher對象。EventDispatcher的構造方法以下,建立一個線程池,而後放入Notifier任務。segmentfault
public EventDispatcher() { this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); this.executor.execute(new Notifier()); }
Notifier的Runnable的類,因此放入線程池的時候,會執行run方法。他主要是從阻塞隊列changedServices取出ServiceInfo,而後根據ServiceInfo的key取出他對應的EventListener集合,再執行EventListener的onEvent方法。ide
@Override public void run() { while (!closed) { ServiceInfo serviceInfo = null; try { // changedServices是LinkedBlockingQueue,從阻塞隊列取值 serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } // 沒取值,從新從阻塞隊列取 if (serviceInfo == null) { continue; } try { // 從observerMap取到EventListener集合 List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { // 執行onEvent方法 List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } }
在Nacos - 啓動中,提到NacosWatch實例化的時候,就會調用namingService.subscribe,他會調用EventDispatcher#addListener方法,在這裏會把監聽放入observerMap的map裏,而後調用serviceChanged方法。this
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); }
當serviceChanged被調用的時候,就會往阻塞隊列存入ServiceInfo。上面已經知道了有個循環任務一直從阻塞隊列changedServices取值,這個值就是這麼來的。spa
public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo); }
這個類有兩個比較重要的成員,一個是observerMap,他的key是serviceInfo.getKey(),value是EventListener集合。一個是changedServices,存放serviceInfo的LinkedBlockingQueue阻塞隊列。這兩個成員的關聯關係經過serviceInfo.getKey()維持。
當調用addListener的時候,就會把serviceInfo存入到changedServices,以及serviceInfo.getKey()和EventListener集合存入到observerMap。
while(true)中,因爲阻塞隊列changedServices有值,就會從中拿到serviceInfo,再經過serviceInfo.getKey()拿到observerMap對應的EventListener集合,而後執行EventListener集合的EventListener.onEvent方法。
線程