探索c#之storm的TimeCacheMap

閱讀目錄:html

  1. 概述
  2. 算法介紹
  3. 清理線程
  4. 獲取、插入、刪除
  5. 總結

概述

最近在看storm,發現其中的TimeCacheMap算法設計頗爲高效,就簡單分享介紹下。
思考一下若是須要一個帶過時淘汰的緩存容器,咱們一般會使用定時器或線程去掃描容器,以便判斷是否過時從而刪除。但這樣性能並不友好,在數據量較大時O(n)檢查是一筆不小的開銷,而且在大量過時數據刪除時須要頻繁對容器加鎖,這會多少會影響到正常的數據讀寫刪除。
Storm設計了一種比較高效的時間緩存容器TimeCacheMap,它的算法能夠在某個時間週期內將數據批量刪除,一次批量刪除只須要加一次鎖便可,而且其讀寫刪除複雜度均爲O(1)。java

算法介紹

TimeCacheMap把要緩存的數據分拆存儲到多個小容器內,這裏稱爲桶。另外有個線程專門在必定時間內去掃描這些桶,一旦發現過時後就把整個桶的數據給刪除掉。 其中第二步比較關鍵,它並非傳統意義上的去定時掃描,而是根據過時時間來觸發,好比若是一個桶過時時間10s,那麼這個線程就10秒觸發一次把整個桶刪除便可,固然多個桶的觸發策略會有所不一樣,但思路是同一個。   
爲了更詳細的描述,用代碼和例子介紹以下:git

    private LinkedList<Dictionary<K, V>> buckets;
    private readonly object Obj = new object();
    private static readonly int NumBuckets = 3;
    private Thread cleaner;

上面使用了k、v的形式做爲緩存數據結構,每一個Dictionary是一個桶,而後使用鏈表把多個桶存儲起來。Obj是要鎖的對象,NumBuckets是桶的數量,cleaner是清理線程。
在緩存初始化的時候,會實例三個空桶加入到buckets,清理線程開始啓動循環檢查,假設過時時間時30秒,桶的數量爲3,當有新數據進來時,會所有加入到第一個桶中。github

爲了刪除性能,清理線程會按期把整個桶給刪除掉,通常咱們會每次把鏈表中最後一個桶給清理掉,而後再加入一個新桶到鏈表頭部。
這種狀況下就不能按照緩存過時時間去觸發線程清理了,由於有三個桶,若是每30秒觸發線程清理掉最後一個桶,那麼第三個桶要等到第90秒纔開始清理,很明顯這樣是不合理的。 正確的應該是第30秒開始清理,這時就須要調整線程觸發時間,好比調整成10秒,繼續模擬下:算法

  1. 觸發前1秒插入新數據到第一個桶,若是調整成10秒觸發,等到觸發刪除這個桶時才過了20秒,跟緩存過時時間30秒不一致一樣不合理,無論是1秒仍是9秒都會致使提早刪除數據,須要繼續調整觸發時間。
  2. 如上緩存提早刪除不能容許的,但延遲刪除通常是能夠接受的,所以能夠加入一些冗餘時間來保證不會提早刪除。 這裏調整到15秒觸發,觸發前1秒插入的緩存桶正好在30秒後觸發刪除,達到不會提早刪除的目的。
  3. 如上在觸發前14秒插入數據,那就須要過了30秒+14秒才能刪除。

根據上面的模擬,調整到15秒觸發是一個比較合理的值,所以推出緩存最長過時時間的公式爲:c#

expirationSecs * (1 + 1 / (numBuckets-1))

若是過時時間是30秒,其最長刪除時間是:緩存

30*(1+1/(3-1))=30*(1+0.5)=45  

所以其過時時間範圍即爲expirationSecs到expirationSecs * (1 + 1 / (numBuckets-1))之間。數據結構

清理線程

如上算法的介紹,咱們在類型的構造函數中,實例化並啓動清理線程:jvm

 public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallBack ex)
    {
        if (numBuckets < 2)
            throw new ArgumentException("numBuckets must be >=2");
        this.buckets = new LinkedList<Dictionary<K, V>>();
        for (int i = 0; i < numBuckets; i++)
            buckets.AddFirst(new Dictionary<K, V>());
        var expirationMillis = expirationSecs * 1000;
        var sleepTime = expirationMillis / (numBuckets - 1);
        cleaner = new Thread(() =>
        {
            while (true)
            {
                Dictionary<K, V> dead = null;
                Thread.Sleep(sleepTime);
                lock (Obj)
                {
                    dead = buckets.Last();
                    buckets.RemoveLast();
                    buckets.AddFirst(new Dictionary<K, V>());
                }
                if (ex != null)
                    ex(dead);
            }
        });
        cleaner.IsBackground = true;
        cleaner.Start();
    }

代碼執行步驟:函數

  1. 初始化桶加入到鏈表
  2. 計算緩存數據最長過時時間,並做爲線程休眠的時間。
  3. 線程觸發時刪除最後一個桶並加入新的桶
  4. 不斷循環休眠觸發觸發
  5. 啓動線程

整個桶的數據刪除只須要加一次鎖便可,保證其高效。

獲取、插入、刪除

遍歷整個鏈表,查詢到第一個知足key的當即返回,這須要保證不會有重複key。

   public V Get(K key)
        {
            lock (Obj)
            {
                foreach (var item in buckets)
                {
                    if (item.ContainsKey(key))
                        return item[key];
                }
                return default(V);
            }
        }

在插入時刪除對應的key,保證不會有重複的key出現。

 public void Put(K key, V value)
    {
        lock (Obj)
        {
            foreach (var item in buckets)
            {
                item.Remove(key);
            }
            buckets.First().Add(key, value);
        }
    }

刪除對應的key

    public void Remove(K key)
    {
        lock (Obj)
        {
            foreach (var item in buckets)
            {
                if (item.ContainsKey(key))
                    item.Remove(key);
            }
        }
    }

總結

那些年咱們一塊兒追過的緩存寫法(三)中有介紹過關於惰性刪除及高效LRU算法優化緩存容器的過時,有興趣的童鞋能夠看看。
完整代碼中有容器Size、ContainsKey的實現,github-TimeCacheMap.c#
在storm中,spout發射的消息和acker的消息即保存在各自的TimeCacheMap裏,若是消息超時後會自動通知spout的fail方法。 在storm0.8後TimeCacheMap被棄用了,使用的是新的RotatingMap,但設計和實現基本沒變,github-TimeCacheMap.javagithub-RotatingMap.java

相關文章
相關標籤/搜索