本文主要研究一下nacos的ConfigDataChangeEventjava
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/utils/event/EventDispatcher.javagit
public interface Event { }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigDataChangeEvent.javagithub
public class ConfigDataChangeEvent implements Event { final public boolean isBeta; final public String dataId; final public String group; final public String tenant; final public String tag; final public long lastModifiedTs; public ConfigDataChangeEvent(String dataId, String group, long gmtModified) { this(false, dataId, group, gmtModified); } public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, String tenant, long gmtModified) { if (null == dataId || null == group) { throw new IllegalArgumentException(); } this.isBeta = isBeta; this.dataId = dataId; this.group = group; this.tenant = tenant; this.tag = null; this.lastModifiedTs = gmtModified; } public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, long gmtModified) { this(isBeta, dataId, group, StringUtils.EMPTY, gmtModified); } public ConfigDataChangeEvent(boolean isBeta, String dataId, String group, String tenant, String tag, long gmtModified) { if (null == dataId || null == group) { throw new IllegalArgumentException(); } this.isBeta = isBeta; this.dataId = dataId; this.group = group; this.tenant = tenant; this.tag = tag; this.lastModifiedTs = gmtModified; } }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/utils/event/EventDispatcher.java併發
static public abstract class AbstractEventListener { public AbstractEventListener() { /** * automatic register */ EventDispatcher.addEventListener(this); } /** * 感興趣的事件列表 * * @return event list */ abstract public List<Class<? extends Event>> interest(); /** * 處理事件 * * @param event event */ abstract public void onEvent(Event event); }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.javaasync
@Service public class AsyncNotifyService extends AbstractEventListener { @Override public List<Class<? extends Event>> interest() { List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>(); // 觸發配置變動同步通知 types.add(ConfigDataChangeEvent.class); return types; } @Override public void onEvent(Event event) { // 併發產生 ConfigDataChangeEvent if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; List<?> ipList = serverListService.getServerList(); // 其實這裏任何類型隊列均可以 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (int i = 0; i < ipList.size(); i++) { queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta)); } EXECUTOR.execute(new AsyncTask(httpclient, queue)); } } @Autowired public AsyncNotifyService(ServerListService serverListService) { this.serverListService = serverListService; httpclient.start(); } //...... }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.javaide
class AsyncTask implements Runnable { public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) { this.httpclient = httpclient; this.queue = queue; } @Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() { while (!queue.isEmpty()) { NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( targetIp)) { // 啓動健康檢查且有不監控的ip則直接把放到通知隊列,不然通知 if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) { // target ip 不健康,則放入通知列表中 ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task asyncTaskExecute(task); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP); if (task.isBeta) { request.setHeader("isBeta", "true"); } httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task)); } } } } private Queue<NotifySingleTask> queue; private CloseableHttpAsyncClient httpclient; }
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.javaui
class AsyncNotifyCallBack implements FutureCallback<HttpResponse> { public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) { this.task = task; this.httpClient = httpClient; } @Override public void completed(HttpResponse response) { long delayed = System.currentTimeMillis() - task.getLastModified(); if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_OK, delayed, task.target); } else { log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified(), response.getStatusLine().getStatusCode()); ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_ERROR, delayed, task.target); //get delay time and set fail count to the task asyncTaskExecute(task); LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } HttpClientUtils.closeQuietly(response); } @Override public void failed(Exception ex) { long delayed = System.currentTimeMillis() - task.getLastModified(); log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified(), ex.toString()); ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed, task.target); //get delay time and set fail count to the task asyncTaskExecute(task); LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } @Override public void cancelled() { LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified(), "CANCELED"); //get delay time and set fail count to the task asyncTaskExecute(task); LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", task.target, task.getDataId(), task.getGroup(), task.getLastModified()); MetricsMonitor.getConfigNotifyException().increment(); } private NotifySingleTask task; private CloseableHttpAsyncClient httpClient; }
ConfigDataChangeEvent實現了Event接口,它包含有isBeta、dataId、group、tenant、tag、lastModifiedTs屬性;AsyncNotifyService繼承了AbstractEventListener接口,其interest返回的列表包含了ConfigDataChangeEvent;其onEvent方法判斷是ConfigDataChangeEvent,則會經過serverListService.getServerList()獲取ipList,而後遍歷該列表往queue添加NotifySingleTask,而後使用該queue建立AsyncTask並提交線程池執行this