固定容量的本地緩存設計

服務一般須要考慮速度和容量限制,加強系統的魯棒性。緩存

背景

筆者曾負責過某公司內公衆號服務開發。公衆號接口服務接收到用戶的推送請求後會構造公衆號消息並寫入消息隊列,路由服務異步接收到消息後進行消息存儲後,再交由推送服務向用戶推送消息。基本流程以下圖所示: bash

消息流程.png
消息存儲過程:

  1. 路由服務發起消息存儲請求,並將消息緩存到本地;
  2. 存儲服務成功存儲消息後異步發送成功通知;
  3. 路由服務接收到成功通知後從本地緩存獲取消息內容後進行後續推送處理;

問題

若存儲服務異常,系統會出現什麼問題?數據結構

  • 路由服務使用local cache臨時存儲消息。當存儲服務異常時,若不加限制,路由服務極有可能致使內存溢出,路由服務不可用;
  • 路由服務發起消息存儲請求爲異步過程,頗有可能會一直消費MQ裏的消息,致使存儲服務承受更大的服務壓力。同時會存在消息可能丟失的風險;

方案

基於信號量實現限制容量的本地緩存。容量大小爲信號量個數,當路由服務發起消息存儲請求時,信號量減1。當路由服務接收到存儲成功通知後,信號量加1。異步

  • 存儲服務正常時,容量限制機制不會起做用,服務性能不會受到影響;
  • 存儲服務異常時,本地緩存的容量會愈來愈小。最後再無可用的信號量時,服務會阻塞等待。此時再也不對消息隊列進行消費。既避免了服務OOM的情況,也下降了服務繼續惡化的可能;

實現

基於信號量實現的限容數據結構BlockingHashMap性能

public class BlockingHashMap<K, V> {

    private static final int DEFAULT_MAX_AVAILABLE = 1000;
    private final ConcurrentHashMap<K, V> inmap = new ConcurrentHashMap<>(DEFAULT_MAX_AVAILABLE);
    private Semaphore sem;


    public BlockingHashMap() {
        this(DEFAULT_MAX_AVAILABLE);
    }

    public BlockingHashMap(int permits) {
        sem = new Semaphore(permits);
    }

    public V put(K key, V value) {
        boolean wasAdded = false;
        try {
            sem.acquire();
            V v = inmap.putIfAbsent(key, value);
            if (v != null) {
                return v;
            }
            wasAdded = true;
        } catch (Exception e) {
        } finally {
            if (!wasAdded) {
                // 若添加失敗,須要釋放信號量
                sem.release();
            }
        }
        return value;
    }

    public V remove(K key) {
        V value = inmap.remove(key);
        if (value != null) {
            // 只有當成功移除元素時才釋放信號量
            sem.release();
        }
        return value;
    }
}
複製代碼

總結

基於信號量實現固定容量的本地緩存,簡單有效。ui

相關文章
相關標籤/搜索