Apollo 8 — ConfigService 異步輪詢接口的實現

源碼

Apollo 長輪詢的實現,是經過客戶端輪詢 /notifications/v2 接口實現的。具體代碼在 com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2.java。java

這個類也是實現了 ReleaseMessageListener 監控,代表他是一個消息監聽器,當有新的消息時,就會調用他的 hanlderMessage 方法。這個具體咱們後面再說。數據庫

該類只有一個 rest 接口: pollNotification 方法。返回值是 DeferredResult,這是 Spring 支持 Servlet 3 的一個類,關於異步同步的不一樣,能夠看筆者的另外一篇文章 異步 Servlet 和同步 Servlet 的性能測試json

該接口提供了幾個參數:緩存

  1. appId appId
  2. cluster 集羣名稱
  3. notificationsAsString 通知對象的 json 字符串
  4. dataCenter,idc 屬性
  5. clientIp 客戶端 IP, 非必傳,爲了擴展吧估計

你們有麼有以爲少了什麼? namespace 。安全

固然,沒有 namespace 這個重要的參數是不存在的。服務器

參數在 notificationsAsString 中。客戶端會將本身全部的 namespace 傳遞到服務端進行查詢。併發

是時候上源碼了。app

@RequestMapping(method = RequestMethod.GET)
  public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
      @RequestParam(value = "appId") String appId,// appId
      @RequestParam(value = "cluster") String cluster,// default
      @RequestParam(value = "notifications") String notificationsAsString,// json 對象 List<ApolloConfigNotification>
      @RequestParam(value = "dataCenter", required = false) String dataCenter,// 基本用不上, idc 屬性
      @RequestParam(value = "ip", required = false) String clientIp) {

    List<ApolloConfigNotification> notifications =// 轉換成對象
          gson.fromJson(notificationsAsString, notificationsTypeReference);
          
    // Spring 的異步對象: timeout 60s, 返回304
    DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper();
    Set<String> namespaces = Sets.newHashSet();
    Map<String, Long> clientSideNotifications = Maps.newHashMap();
    Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);// 過濾一下名字
    // 循環
    for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
      // 拿出 key
      String normalizedNamespace = notificationEntry.getKey();
      // 拿出 value
      ApolloConfigNotification notification = notificationEntry.getValue();
      /* 添加到 namespaces Set */
      namespaces.add(normalizedNamespace);
      // 添加到 client 端的通知, key 是 namespace, values 是 messageId
      clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
      // 若是不相等, 記錄客戶端名字
      if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
        // 記錄 key = 標準名字, value = 客戶端名字
        deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
      }
    }// 記在 namespaces 集合, clientSideNotifications 也put (namespace, notificationId)

    // 組裝獲得須要觀察的 key,包括公共的.
    Multimap<String, String> watchedKeysMap =
        watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);// namespaces 是集合
    // 獲得 value; 這個 value 也就是 appId + cluster + namespace
    Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
    // 從緩存獲得最新的發佈消息
    List<ReleaseMessage> latestReleaseMessages =// 根據 key 從緩存獲得最新發布的消息.
        releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

    /* 若是不關閉, 這個請求將會一直持有一個數據庫鏈接. 影響併發能力. 這是一個 hack 操做*/
    entityManagerUtil.closeEntityManager();
    // 計算出新的通知
    List<ApolloConfigNotification> newNotifications =
        getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
            latestReleaseMessages);
    // 不是空, 理解返回結果, 不等待
    if (!CollectionUtils.isEmpty(newNotifications)) {
      deferredResultWrapper.setResult(newNotifications);
    } else {
      // 設置 timeout 回調:打印日誌
      deferredResultWrapper
          .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
      // 設置完成回調:刪除 key
      deferredResultWrapper.onCompletion(() -> {
        //取消註冊
        for (String key : watchedKeys) {
          deferredResults.remove(key, deferredResultWrapper);
        }
      });

      //register all keys 註冊
      for (String key : watchedKeys) {
        this.deferredResults.put(key, deferredResultWrapper);
      }
    }
    // 當即返回
    return deferredResultWrapper.getResult();/** @see DeferredResultHandler 是關鍵 */
  }

註釋寫了不少了,再簡單說說邏輯:異步

  1. 解析 JSON 字符串爲 List< ApolloConfigNotification> 對象。
  2. 建立 Spring 異步對象。
  3. 處理過濾 namespace。
  4. 根據 namespace 生成須要監聽的 key,格式爲 appId + cluster + namespace,包括公共 namespace。並獲取最新的 Release 信息。
  5. 關閉 Spring 實例管理器,釋放數據庫資源。
  6. 根據剛剛獲得的 ReleaseMessage,和客戶端的 ReleaseMessage 的版本進行對比,生成新的配置通知對象集合。
  7. 若是不是空 —— 當即返回給客戶端,結束這次調用。若是沒有,進入第 8 步。
  8. 設置 timeout 回調方法 —— 打印日誌。再設置完成回調方法:刪除註冊的 key。
  9. 對客戶端感興趣的 key 進行註冊,這些 key 都對應着 deferredResultWrapper 對象,能夠認爲他就是客戶端。
  10. 返回 Spring 異步對象。該請求將被異步掛起。

Apollo 的 DeferredResultWrapper 保證了 Spring 的 DeferredResult 對象,泛型內容是 List , 構造這個對象,默認的 timeout 是 60 秒,即掛起 60 秒。同時,對 setResult 方法進行包裝,加入了對客戶端 key 和服務端 key 的一個映射(大小寫不一致) 。 async

咱們剛剛說,Apollo 會將這些 key 註冊起來。那麼何時使用呢,異步對象被掛起,又是上面時候被喚醒呢?

答案就在 handleMessage 方法裏。咱們剛剛說他是一個監聽器,當消息掃描器掃描到新的消息時,會通知全部的監聽器,也就是執行 handlerMessage 方法。方法內容以下:

@Override
public void handleMessage(ReleaseMessage message, String channel) {

  String content = message.getMessage();
  if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
    return;
  }
  String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

  //create a new list to avoid ConcurrentModificationException 構造一個新 list ,防止併發失敗
  List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

  // 建立通知對象
  ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
  configNotification.addMessage(content, message.getId());

  //do async notification if too many clients 若是有大量的客戶端(100)在等待,使用線程池異步處理
  if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
    // 大量通知批量處理
    largeNotificationBatchExecutorService.submit(() -> {
      for (int i = 0; i < results.size(); i++) { // 循環
        /*
         * 假設一個公共 Namespace 有10W 臺機器使用,若是該公共 Namespace 發佈時直接下發配置更新消息的話,
         * 就會致使這 10W 臺機器一會兒都來請求配置,這動靜就有點大了,並且對 Config Service 的壓力也會比較大。
         * 即"驚羣效應"
         */
        if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {// 若是處理了一批客戶端,休息一下(100ms)
            TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
        }
        results.get(i).setResult(configNotification);// 通知每一個等待的 HTTP 請求
      }
    });
    return;
  }

  // 不然,同步處理
  for (DeferredResultWrapper result : results) {
    result.setResult(configNotification);
  }
}

筆者去除了一些日誌和一些數據判斷。大體的邏輯以下:

  1. 消息類型必須是 「apollo-release」。而後拿到消息裏的 namespace 內容。
  2. 根據 namespace 從註冊器裏拿出 Spring 異步對象集合
  3. 建立通知對象。
  4. 若是有超過 100 個客戶端在等待,那麼就使用線程池批量執行通知。不然就同步慢慢執行。
  5. 每處理 100 個客戶端就休息 100ms,防止發生驚羣效應,致使大量客戶端調用配置獲取接口,引發服務抖動。
  6. 循環調用 Spring 異步對象的 setResult 方法,讓其當即返回。

具體的流程圖以下:

其中,灰色區域是掃描器的異步線程,黃色區域是接口的同步線程。他們共享 deferredResults 這個線程安全的 Map,實現異步解耦和實時通知客戶端。

總結

好了,這就是 Apollo 的長輪詢接口,客戶端會不斷的輪詢服務器,服務器會 Hold住 60 秒,這是經過 Servlet 3 的異步 + NIO 來實現的,可以保持萬級鏈接(Tomcat 默認 10000)。

經過一個線程安全的 Map + 監聽器,讓掃描器線程和 HTTP 線程共享 Spring 異步對象,即實現了消息實時通知,也讓應用程序實現異步解耦。

相關文章
相關標籤/搜索