本文主要研究一下nacos的configWatchersjava
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.javagit
@Controller @RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH) public class CommunicationController { private final DumpService dumpService; private final LongPollingService longPollingService; private String trueStr = "true"; @Autowired public CommunicationController(DumpService dumpService, LongPollingService longPollingService) { this.dumpService = dumpService; this.longPollingService = longPollingService; } //...... /** * 在本臺機器上得到訂閱改配置的客戶端信息 */ @RequestMapping(value = "/configWatchers", method = RequestMethod.GET) @ResponseBody public SampleResult getSubClientConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false) String tenant, ModelMap modelMap) { group = StringUtils.isBlank(group) ? Constants.DEFAULT_GROUP : group; return longPollingService.getCollectSubscribleInfo(dataId, group, tenant); } //...... }
/configWatchers
接口,它經過longPollingService.getCollectSubscribleInfo返回SampleResultnacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.javagithub
@Service public class LongPollingService extends AbstractEventListener { private static final int FIXED_POLLING_INTERVAL_MS = 10000; private static final int SAMPLE_PERIOD = 100; private static final int SAMPLE_TIMES = 3; private static final String TRUE_STR = "true"; private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>(); //...... public SampleResult getCollectSubscribleInfo(String dataId, String group, String tenant) { List<SampleResult> sampleResultLst = new ArrayList<SampleResult>(50); for (int i = 0; i < SAMPLE_TIMES; i++) { SampleResult sampleTmp = getSubscribleInfo(dataId, group, tenant); if (sampleTmp != null) { sampleResultLst.add(sampleTmp); } if (i < SAMPLE_TIMES - 1) { try { Thread.sleep(SAMPLE_PERIOD); } catch (InterruptedException e) { LogUtil.clientLog.error("sleep wrong", e); } } } SampleResult sampleResult = mergeSampleResult(sampleResultLst); return sampleResult; } public SampleResult getSubscribleInfo(String dataId, String group, String tenant) { String groupKey = GroupKey.getKeyTenant(dataId, group, tenant); SampleResult sampleResult = new SampleResult(); Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50); for (ClientLongPolling clientLongPolling : allSubs) { if (clientLongPolling.clientMd5Map.containsKey(groupKey)) { lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey)); } } sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus); return sampleResult; } /** * 聚合採樣結果中的採樣ip和監聽配置的信息;合併策略用後面的覆蓋前面的是沒有問題的 * * @param sampleResults sample Results * @return Results */ public SampleResult mergeSampleResult(List<SampleResult> sampleResults) { SampleResult mergeResult = new SampleResult(); Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50); for (SampleResult sampleResult : sampleResults) { Map<String, String> lisentersGroupkeyStatusTmp = sampleResult.getLisentersGroupkeyStatus(); for (Map.Entry<String, String> entry : lisentersGroupkeyStatusTmp.entrySet()) { lisentersGroupkeyStatus.put(entry.getKey(), entry.getValue()); } } mergeResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus); return mergeResult; } //...... }
默認值爲3
)次,每次經過getSubscribleInfo方法取出SampleResult加入到sampleResultLst,循環的時候間隔SAMPLE_PERIOD(默認爲100
)毫秒;最後經過mergeSampleResult方法進行合併,最後返回SampleResultCommunicationController提供了/configWatchers
接口,它經過longPollingService.getCollectSubscribleInfo返回SampleResultapp