不使用 MQ 如何實現 pub/sub 場景?

hello,你們好,我是小黑,又和你們見面啦~~java

在配置中心中,有一個經典的 pub/sub 場景:某個配置項發生變動以後,須要實時的同步到各個服務端節點,同時推送給客戶端集羣。redis

在以前實現的簡易版配置中心中是經過 redis 的 pub/sub 來實現的。這種實現雖然簡單,但卻強依賴了 redis。數據庫

配置中心做爲一個基礎組件,若是能儘量的減小外部依賴,那對使用方來講必定是更友好的。那麼,有沒有可能不使用 MQ 來實現 pub/sub 的場景呢?答案是確定的。服務器

基於 DB 的 pub/sub 方案

Apollo 在實現上述場景時,並無選用基於 MQ 來進行實現,而是經過數據庫實現了一個簡單的消息隊列。示意圖以下:app

ReleaseMessage 示意圖

大體實現方式以下:異步

  1. Admin Service 在配置發佈後會往 ReleaseMessage 表插入一條消息記錄
  2. Config Service 中有一個線程會每秒掃描一次 ReleaseMessage 表,看是否有新的消息記錄(怎麼判斷是否是新消息呢,怎麼保證每一個 client 不會重複消費呢?)
  3. Config Service 若是發現有新的消息記錄,就會通知給客戶端(怎麼保證通知給每一個客戶端呢?每一個 Config Service 都通知,不會重複通知嗎?)

下面,就讓咱們帶着這幾個問題來學習一下源碼吧。(畫外音:思路比源碼更重要學習

DatabaseMessageSender

Admin Service 在配置發佈後會調用 DatabaseMessageSender#sendMessage 方法,該方法主要作了兩件事情:spa

  1. 建立 ReleaseMessage ,而後將其保存到數據庫中
  2. 記錄當前保存的 ReleaseMessage Id,將其放到 DatabaseMessageSender#toClean 隊列中。

DatabaseMessageSender#sendMessage

爲何要記錄當前保存的 ReleaseMessage Id 呢?線程

DatabaseMessageSender 中有個定時任務,會去清除比當前 ID 小的 ReleaseMessage。設計

DatabaseMessageSender#cleanMessage

ReleaseMessageScanner

Config Service 中經過 ReleaseMessageScanner 組件會每秒(默認配置下)掃描一次 ReleaseMessage 表,來獲取最新的消息。

ReleaseMessageScanner#afterPropertiesSet

有了這個基於 DB 的 pub/sub,Admin Service 在配置發佈以後,每一個 Config Service 都會經過 DB 來感知到這個消息,而後再通知給客戶端。

那 Config Service 又是如何通知客戶端的呢?

基於長輪詢的實時消息

在 Apollo 的設計中,配置發生更新以後,並非服務端主動推給客戶端的,並且客戶端經過長輪詢的方式向服務端詢問是否有配置發生了變動。大體思路爲:若是在 60 秒內沒有該客戶端關心的配置發佈,那麼會返回 Http 狀態碼 304 給客戶端;若是有該客戶端關心的配置發佈,請求就會當即返回,客戶端從返回的結果中獲取到配置變化的 namespace 後,會當即請求 Config Service 獲取該 namespace 的最新配置

客戶端的相關代碼在 RemoteConfigLongPollService#doLongPollingRefresh,代碼比較簡單,感興趣的同窗能夠自行查閱。

這裏咱們重點看一下服務端是如何實現的。

在傳統的 servlet 模型中,每一個請求都是由某個線程處理的,若是一個請求處理的時間較長,那麼這種基於線程池的同步模型很快就會把全部線程耗盡,致使服務器沒法響應新的請求。

servlet 3.0 中引入了異步支持,容許對一個請求進行異步處理,工做線程在此期間不會被阻塞,能夠繼續處理傳入的客戶端請求。

從 Spring 3.2 開始,可使用 DeferredResult 來實現異步處理。使用 DeferredResult 時,能夠設置超時,超時以後自動返回超時錯誤響應。同時,能夠在另外一個線程中,能夠調用其 setResult()寫入結果返回。

在 Apollo 客戶端長輪詢的地址爲 /notifications/v2,對應的服務端代碼爲 NotificationControllerV2

NotificationControllerV2 中就使用了 Spring 的 DeferredResult來實現的。本文重在解決問題的思路,就不展現源碼了,感興趣的同窗能夠本身閱讀一下源碼。不過,小黑同窗寫了一個簡單的 demo 來幫助咱們理解一下 DeferredResult 的使用。

@Slf4j
@RestController
public class DeferredResultDemoController {

    private final Multimap<String, DeferredResult<String>> deferredResults = ArrayListMultimap.create();

    @GetMapping("/info")
    public DeferredResult<String> info(String key) {
        // 設置 1 秒超時時間,設置超時是返回的結果
        DeferredResult<String> result = new DeferredResult<>(1000L, "key not change");
        // 將 result 放到 deferredResults 中, key 即爲當前請求所關心的配置項
        deferredResults.put(key, result);
        // 若是超時,移除當前 DeferredResult,並打印日誌,同時返回 DeferredResult 構造器中傳入的結果
        result.onTimeout(() -> {
            deferredResults.remove(key, result);
            log.info("time out key not change");
        });
        // 若是完成了,則從 deferredResults 中移除當前 DeferredResult
        result.onCompletion(() -> deferredResults.remove(key, result));
        return result;
    }

    @PostConstruct
    public void init() {
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(700);
                } catch (InterruptedException e) {
                    log.info(e.getMessage(), e);
                }
                // 定時任務,模擬配置更新
                // 當 hello key 發生變動以後,從 deferredResults 獲取到相關的 DeferredResult,經過 setResult 方法設置返回結果,同時移除 deferredResults
                if (deferredResults.containsKey("hello")) {
                    Collection<DeferredResult<String>> results = deferredResults.removeAll("hello");
                    results.forEach(stringDeferredResult -> stringDeferredResult.setResult("hello key change :" + System.currentTimeMillis()));
                }
            }
        }).start();
    }
}
相關文章
相關標籤/搜索