Redis 的緩存淘汰機制(Eviction)

本文從源碼層面分析了 redis 的緩存淘汰機制,並在文章末尾描述使用 Java 實現的思路,以供參考。java

相關配置

爲了適配用做緩存的場景,redis 支持緩存淘汰(eviction)並提供相應的了配置項:node

maxmemory

 設置內存使用上限,該值不能設置爲小於 1M 的容量。
 選項的默認值爲 0,此時系統會自行計算一個內存上限。git

maxmemory-policy

 熟悉 redis 的朋友都知道,每一個數據庫維護了兩個字典:github

  • db.dict:數據庫中全部鍵值對,也被稱做數據庫的 keyspace
  • db.expires:帶有生命週期的 key 及其對應的 TTL(存留時間),所以也被稱做 expire set

 當達到內存使用上限maxmemory時,可指定的清理緩存所使用的策略有:redis

  • noeviction 當達到最大內存時直接返回錯誤,不覆蓋或逐出任何數據
  • allkeys-lfu 淘汰整個 keyspace 中最不經常使用的 (LFU) 鍵 (4.0 或更高版本)
  • allkeys-lru 淘汰整個 keyspace 最近最少使用的 (LRU) 鍵
  • allkeys-random 淘汰整個 keyspace 中的隨機鍵
  • volatile-ttl 淘汰 expire set 中 TTL 最短的鍵
  • volatile-lfu 淘汰 expire set 中最不經常使用的鍵 (4.0 或更高版本)
  • volatile-lru 淘汰 expire set 中最近最少使用的 (LRU) 鍵
  • volatile-random 淘汰 expire set 中的隨機鍵

 當 expire set 爲空時,volatile-*noeviction 行爲一致。算法

maxmemory-samples

 爲了保證性能,redis 中使用的 LRU 與 LFU 算法是一類近似實現。
 簡單來講就是:算法選擇被淘汰記錄時,不會遍歷全部記錄,而是以 隨機採樣 的方式選取部分記錄進行淘汰。
 maxmemory-samples 選項控制該過程的採樣數量,增大該值會增長 CPU 開銷,但算法效果能更逼近實際的 LRU 與 LFU 。數據庫

lazyfree-lazy-eviction

 清理緩存就是爲了釋放內存,但這一過程會阻塞主線程,影響其餘命令的執行。
 當刪除某個巨型記錄(好比:包含數百條記錄的 list)時,會引發性能問題,甚至致使系統假死。
 延遲釋放 機制會將巨型記錄的內存釋放,交由其餘線程異步處理,從而提升系統的性能。
 開啓該選項後,可能出現使用內存超過 maxmemory 上限的狀況。數組

緩存淘汰機制

一個完整的緩存淘汰機制須要解決兩個問題:緩存

  • 肯定淘汰哪些記錄 —— 淘汰策略
  • 刪除被淘汰的記錄 —— 刪除策略

淘汰策略

緩存能使用的內存是有限的,當空間不足時,應該優先淘汰那些未來再也不被訪問的數據,保留那些未來還會頻繁訪問的數據。所以淘汰算法會圍繞 時間局部性 原理進行設計,即:若是一個數據正在被訪問,那麼在近期極可能會被再次訪問安全

爲了適應緩存讀多寫少的特色,實際應用中會使用哈希表來實現緩存。當須要實現某種特定的緩存淘汰策略時,須要引入額外的簿記 book keeping 結構。

下面回顧 3 種最多見的緩存淘汰策略。

FIFO (先進先出)

 越早進入緩存的數據,其再也不被訪問的可能性越大。
 所以在淘汰緩存時,應選擇在內存中停留時間最長的緩存記錄。

 使用隊列便可實現該策略:
 


 優勢:實現簡單,適合線性訪問的場景
 缺點:沒法適應特定的訪問熱點,緩存的命中率差
 簿記開銷:時間 O(1),空間 O(N)

LRU (最近最少使用)

 一個緩存被訪問後,近期再被訪問的可能性很大。
 能夠記錄每一個緩存記錄的最近訪問時間,最近未被訪問時間最長的數據會被首先淘汰。

 使用鏈表便可實現該策略:
 


 當更新 LRU 信息時,只需調整指針:
 


 優勢:實現簡單,能適應訪問熱點
 缺點:對偶發的訪問敏感,影響命中率
 簿記開銷:時間 O(1),空間 O(N)

LRU 改進

 原始的 LRU 算法緩存的是最近訪問了 1 次的數據,所以不能很好地區分頻繁和不頻繁緩存引用。
 這意味着,部分冷門的低頻數據也可能進入到緩存,並將本來的熱點記錄擠出緩存。
 爲了減小偶發訪問對緩存的影響,後續提出的 LRU-K 算法做出了以下改進:

在 LRU 簿記的基礎上增長一個歷史隊列 History Queue
  • 當記錄訪問次數小於 K 時,會記錄在歷史隊列中(當歷史隊列滿時,可使用 FIFO 或 LRU 策略進行淘汰)
  • 當記錄訪問次數大於等於 K 時,會被從歷史隊列中移出,並記錄到 LRU 緩存中

 K 值越大,緩存命中率越高,但適應性差,須要通過大量訪問才能將過時的熱點記錄淘汰掉。
 綜合各類因素後,實踐中經常使用的是 LRU-2 算法:
 


 優勢:減小偶發訪問對緩存命中率的影響
 缺點:須要額外的簿記開銷
 簿記開銷:時間 O(1),空間 O(N+M)

LFU (最不常用)

 一個緩存近期內訪問頻率越高,其再被訪問的可能性越大。
 能夠記錄每一個緩存記錄的最近一段時間的訪問頻率,訪問頻率低的數據會被首先淘汰。
 
 實現 LFU 的一個簡單方式,是在緩存記錄設置一個記錄訪問次數的計數器,而後將其放入一個小頂堆:
 


 爲了保證數據的時效性,還要以必定的時間間隔對計數器進行衰減,保證過時的熱點數據可以被及時淘汰:
 


刪除策略

常見刪除策略能夠分爲如下幾種:

  • 實時刪除:
     每次增長新的記錄時,當即查找可淘汰的記錄,若是存在則將該記錄從緩存中刪除

    • 優勢:實時性好,最節省內存空間
    • 缺點:查找淘汰記錄會影響寫入的效率,須要額外的簿記結構提升查找效率(好比 LRU 中的鏈表)
  • 惰性刪除:
     在緩存中設置兩個計數器,一個統計訪問緩存的次數,一個統計可淘汰記錄的數量
     每通過 N 次訪問後或當前可淘汰記錄數量大於 M,則觸發一次批量刪除(M 與 N 可調節)

    • 優勢:對正常緩存操做影響小,批量刪除減小維護開銷
    • 缺點:實時性較差,偶發的刪除操做會致使訪問耗時波動
  • 異步刪除:
     設置一個獨立的定時器線程,每隔固定的時間觸發一次批量刪除

    • 優勢:對正常緩存操做影透明,無額外性能開銷
    • 缺點:須要增長維護線程,而且須要提早規劃緩存的負載,以此決定如何在多個緩存實例上調度

redis 實現

redis 中實現了 LRU 與 LFU 兩種淘汰策略

爲了節省空間,redis 沒有使用前面描述的簿記結構實現 LRU 或 LFU,而是在 robj 中使用一個 24bits 的空間記錄訪問信息:

#define LRU_BITS 24

typedef struct redisObject {
    ...
    unsigned lru:LRU_BITS;  /* LRU 時間 (相對與全局 lru_clock 的時間) 或
                             * LFU 數據 (8bits 記錄訪問頻率,16 bits 記錄訪問時間). */
} robj;

每當記錄被命中時,redis 都會更新 robj.lru 做爲後面淘汰算法運行的依據:

robj *lookupKey(redisDb *db, robj *key, int flags) {
    // ...

    // 根據 maxmemory_policy 選擇不一樣的更新策略
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        updateLFU(val);
    } else {
        val->lru = LRU_CLOCK();
    }
}

LFU 與 LRU 的更新關鍵在於 updateLFU 函數與 LRU_CLOCK 宏,下面分別進行分析。

更新 LRU 時間

當時使用 LRU 算法時,robj.lru 記錄的是最近一次訪問的時間戳,能夠據此找出長時間未被訪問的記錄。

爲了減小系統調用,redis 設置了一個全局的時鐘 server.lruclock 並交由後臺任務進行更新:

#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* 以毫秒爲單位的時鐘精度 */

/**
 * server.lruclock 的更新頻率爲 1000/server.hz
 * 若是該頻率高於 LRU 時鐘精度,則直接用 server.lruclock
 * 避免調用 getLRUClock() 產生額外的開銷
 */
#define LRU_CLOCK() ((1000/server.hz <= LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock())

unsigned int getLRUClock(void) {
    return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

計算 LRU 時間方法以下:

unsigned long long estimateObjectIdleTime(robj *o) {
    unsigned long long lruclock = LRU_CLOCK();
    if (lruclock >= o->lru) {
        return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
    } else {
        // 處理 LRU 時間溢出的狀況
        return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
                    LRU_CLOCK_RESOLUTION;
    }
}

LRU_CLOCK_RESOLUTION爲 1000ms 時,robj.lru最長可記錄的 LRU 時長爲 194 天0xFFFFFF / 3600 / 24

更新 LFU 計數

當時使用 LFU 算法時,robj.lru 被分爲兩部分:16bits 記錄最近一次訪問時間,8bits 用做計數器

void updateLFU(robj *val) {
    unsigned long counter = LFUDecrAndReturn(val); // 衰減計數
    counter = LFULogIncr(counter); // 增長計數
    val->lru = (LFUGetTimeInMinutes()<<8) | counter; // 更新時間
}

更新訪問時間

前 16bits 用於保存最近一次被訪問的時間:

/**
 * 獲取 UNIX 分鐘時間戳,且只保留最低 16bits
 * 用於表示最近一次衰減時間 LDT (last decrement time)
 */
unsigned long LFUGetTimeInMinutes(void) {
    return (server.unixtime/60) & 65535;
}

增長訪問計數

後 8bits 是一個對數計數器 logarithmic counter,裏面保存的是訪問次數的對數:

#define LFU_INIT_VAL 5 

 // 對數遞增計數器,最大值爲 255
uint8_t LFULogIncr(uint8_t counter) {
    if (counter == 255) return 255;
    double r = (double)rand()/RAND_MAX;
    double baseval = counter - LFU_INIT_VAL;
    if (baseval < 0) baseval = 0;
    double p = 1.0/(baseval*server.lfu_log_factor+1);
    if (r < p) counter++;
    return counter;
}

server.lfu_log_factor = 10 時,p = 1/((counter-LFU_INIT_VAL)*server.lfu_log_factor+1) 的增加函數如圖所示:

使用函數 rand() 生成的介於 0 與 1 之間隨機浮點數 r 符合均勻分佈,隨着 counter 的增大,其自增成功的機率迅速下降。

下列表格展現了 counter 在不一樣 lfu_log_factor 狀況下,達到飽和(255)所需的訪問次數:

+--------+------------+------------+------------+------------+------------+
| factor | 100 hits   | 1000 hits  | 100K hits  | 1M hits    | 10M hits   |
+--------+------------+------------+------------+------------+------------+
| 0      | 104        | 255        | 255        | 255        | 255        |
+--------+------------+------------+------------+------------+------------+
| 1      | 18         | 49         | 255        | 255        | 255        |
+--------+------------+------------+------------+------------+------------+
| 10     | 10         | 18         | 142        | 255        | 255        |
+--------+------------+------------+------------+------------+------------+
| 100    | 8          | 11         | 49         | 143        | 255        |
+--------+------------+------------+------------+------------+------------+

衰減訪問計數

一樣的,爲了保證過時的熱點數據可以被及時淘汰,redis 使用以下衰減函數:

// 計算距離上一次衰減的時間 ,單位爲分鐘
unsigned long LFUTimeElapsed(unsigned long ldt) {
    unsigned long now = LFUGetTimeInMinutes();
    if (now >= ldt) return now-ldt;
    return 65535-ldt+now;
}

/**
 * 衰減函數,返回根據 LDT 時間戳衰減後的 LFU 計數
 * 不更新計數器
 */
unsigned long LFUDecrAndReturn(robj *o) {
    unsigned long ldt = o->lru >> 8;
    unsigned long counter = o->lru & 255;
    /**
    * 衰減因子 server.lfu_decay_time 用於控制計數器的衰減速度
    * 每過 server.lfu_decay_time 分鐘訪問計數減 1
    * 默認值爲 1
    */
    unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
    if (num_periods)
        counter = (num_periods > counter) ? 0 : counter - num_periods;
    return counter;
}

16bits 最多能保存的分鐘數,換算整天數約爲 45 天,所以 LDT 時間戳每隔 45 天就會重置一次。

執行刪除

每當客戶端執行命令產生新數據時,redis 會檢查內存使用是否超過 maxmemory,若是超過則嘗試根據 maxmemory_policy 淘汰數據:

// redis 處理命令的主方法,在真正執行命令前,會有各類檢查,包括對OOM狀況下的處理:
int processCommand(client *c) {

    // ...

    // 設置了 maxmemory 時,若是有必要,嘗試釋放內存(evict)
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = (performEvictions() == EVICT_FAIL);
        
        // ...

        // 若是釋放內存失敗,而且當前將要執行的命令不容許OOM(通常是寫入類命令)
        if (out_of_memory && reject_cmd_on_oom) {
            rejectCommand(c, shared.oomerr); // 向客戶端返回OOM
            return C_OK;
        }
    }
}

實際執行刪除的是 performEvictions 函數:

int performEvictions(void) {
    // 循環,嘗試釋放足夠大的內存
    while (mem_freed < (long long)mem_tofree) {
        
        // ...

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {

            /**
            * redis 使用的是近似 LRU / LFU 算法
            * 在淘汰對象時不會遍歷全部記錄,而是對記錄進行採樣
            * EvictionPoolLRU 被用於臨時存儲應該被優先淘汰的樣本數據
            */
            struct evictionPoolEntry *pool = EvictionPoolLRU;
            
            // 根據配置的 maxmemory-policy,拿到一個能夠釋放掉的bestkey
            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                // 遍歷全部的 db 實例
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    // 根據 policy 選擇採樣的集合(keyspace 或 expire set)
                    if ((keys = dictSize(dict)) != 0) {
                        // 採樣並填充 pool
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }

                // 遍歷 pool 中的記錄,釋放內存
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict, pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires, pool[k].key);
                    }

                    // 將記錄從 pool 中剔除
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    if (de) {
                        // 提取該記錄的 key
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        // 最終選中了一個 bestkey
        if (bestkey) {

            // 若是配置了 lazyfree-lazy-eviction,嘗試異步刪除
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            
            // ...

        } else {
            goto cant_free; /* nothing to free... */
        }
    }
}

負責採樣的 evictionPoolPopulate 函數:

#define EVPOOL_SIZE 16
#define EVPOOL_CACHED_SDS_SIZE 255
struct evictionPoolEntry {
    unsigned long long idle;    /* LRU 空閒時間 / LFU 頻率倒數(優先淘汰該值較大的記錄) */
    sds key;                    /* 參與淘汰篩選的鍵 */
    sds cached;                 /* 鍵名緩存 */
    int dbid;                   /* 數據庫ID */
};

// evictionPool 數組用於輔助 eviction 操做
static struct evictionPoolEntry *evictionPoolEntry;

/**
 * 在給定的 sampledict 集合中進行採樣
 * 並將其中應該被淘汰的記錄記錄至 evictionPool
 */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];

    // 從 sampledict 中隨機獲取 maxmemory_samples 個樣本數據
    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);

    // 遍歷樣本數據
    for (j = 0; j < count; j++) {
        // 根據 maxmemory_policy 計算樣本空閒時間 idle
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            idle = 255-LFUDecrAndReturn(o);
        } else {
            // ...
        }

        k = 0; // 根據 idle 定位樣本在 evictionPool 中的索引(樣本按照 idle 升序)
        while (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle) k++;
        
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            // 樣本空閒時間不夠長,不參與該輪 eviction
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            // 樣本對應的位置爲空,能夠直接插入至該位置
        } else {
           // 樣本對應的位置已被佔用,移動其餘元素空出該位置
        }

        // ...
        
        // 將樣本數據插入其對應的位置 k 
        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
           // 若是 key 長度不超過 EVPOOL_CACHED_SDS_SIZE,則複用 sds 對象
        }
        pool[k].idle = idle;
        pool[k].dbid = dbid;
    }
}

Java 實現

在瞭解以上知識後,嘗試使用 Java 實現 線程安全 的淘汰策略。

肯定簿記結構

在一個多線程安全的緩存中,很重要的一點是減小簿記:

  • 一方面避免額外狀態的維護開銷
  • 另外一方面能夠減小系統處於不一致狀態的邊界狀況

所以參考 redis 使用計數器來記錄訪問模式:

/**
 * 緩存記錄
 */
public abstract class CacheEntry {

    // CAS Updater
    private static final AtomicLongFieldUpdater<CacheEntry>
            TTL_UPDATER = AtomicLongFieldUpdater.newUpdater(CacheEntry.class, "ttl");

    // 緩存記錄的剩餘存活時間(無符號長整數)
    private volatile long ttl;

    protected CacheEntry(long ttl) {
        this.ttl = ttl;
    }

    public long ttl() {
        return ttl;
    }

    // 支持併發更新 TTL
    public boolean casTTL(long old, long ttl) {
        return TTL_UPDATER.compareAndSet(this, old, ttl);
    }

}
/**
 * 淘汰策略
 */
public interface EvictStrategy {

    // 更新緩存記錄的 TTL
    void updateTTL(CacheEntry node);

    // 根據當前時間戳,計算緩存記錄的 TTL
    long weightTTL(CacheEntry node, long now);

}

肯定刪除策略

受限於簿記結構,redis 只能經過採樣來規避大量的遍歷,減小 實時刪除 策略對主線程的阻塞。
而在對於內存限制沒那麼嚴謹的狀況下,可使用 懶惰刪除 策略,減小單次請求的開銷:

public abstract class EvictableCache {

    EvictStrategy evicting; // 淘汰策略

    /**
     * 在讀寫緩存記錄時,更新該記錄的 TTL
     * @param entry 最近被訪問的緩存記錄
     */
    void accessEntry(CacheEntry entry) {
        evicting.updateTTL(entry);
    }

    /**
     * 批量淘汰緩存
     * @param evictSamples 緩存樣本
     * @param evictNum 最大淘汰數量
     * @return 應該被淘汰的記錄
     */
    Collection<CacheEntry> evictEntries(Iterable<CacheEntry> evictSamples, int evictNum) {

        // 比較兩個 CacheEntry 的 TTL(優先淘汰 TTL 較小的記錄)
        Comparator<CacheEntry> comparator = new Comparator<CacheEntry>() {
            final long now = System.currentTimeMillis();
            public int compare(CacheEntry o1, CacheEntry o2) {
                long w1 = evicting.weightTTL(o1, now);
                long w2 = evicting.weightTTL(o2, now);
                return -Long.compareUnsigned(w1, w2);
            }
        };

        // 使用大頂堆記錄 TTL 最小的 K 個 CacheEntry
        PriorityQueue<CacheEntry> evictPool = new PriorityQueue<>(evictNum, comparator);

        Iterator<CacheEntry> iterator = evictSamples.iterator();
        while (iterator.hasNext()) {
            CacheEntry entry = iterator.next();
            if (evictPool.size() < evictNum) {
                evictPool.add(entry);
            } else {
                // 若是 CacheEntry 的 TTL 小於堆頂記錄
                // 則彈出堆頂記錄,並將 TTL 更小的記錄放入堆中
                CacheEntry top = evictPool.peek();
                if (comparator.compare(entry, top) < 1) {
                    evictPool.poll();
                    evictPool.add(entry);
                }
            }
        }

        return evictPool;
    }
}

實現淘汰策略

FIFO 策略

/**
 * FIFO 策略
 */
public class FirstInFirstOut implements EvictStrategy {

    // 計數器,每發生一次訪問操做自增 1
    private final AtomicLong counter = new AtomicLong(0);

    // 第一次訪問時才更新 TTL
    public void updateTTL(CacheEntry node) {
        node.casTTL(0, counter.incrementAndGet());
    }

    // 返回第一次被訪問的序號
    public long weightTTL(CacheEntry node, long now) {
        return node.ttl();
    }

}

LRU 策略

/**
 * LRU-2 策略
 */
public class LeastRecentlyUsed implements EvictStrategy {

    // 邏輯時鐘,每發生一次訪問操做自增 1
    private final AtomicLong clock = new AtomicLong(0);

    /**
     * 更新 LRU 時間
     */
    public void updateTTL(CacheEntry node) {
        long old = node.ttl();
        long tick = clock.incrementAndGet();
        long flag = old == 0 ? Long.MIN_VALUE: 0;
        // flag = Long.MIN_VALUE 表示放入 History Queue
        // flag = 0              表示放入 LRU Cache
        long ttl = (tick & Long.MAX_VALUE) | flag;
        while ((old & Long.MAX_VALUE) < tick && ! node.casTTL(old, ttl)) {
            old = node.ttl();
            ttl = tick & Long.MAX_VALUE; // CAS 失敗說明已是二次訪問
        }
    }

    /**
     * 根據 LRU 時間計算 TTL
     */
    public long weightTTL(CacheEntry node, long now) {
        long ttl = node.ttl();
        return -1L - ttl;
    }

}

LFU 策略

/**
 * LFU-AgeDecay 策略
 */
public class LeastFrequentlyUsed implements EvictStrategy {

    private static final int TIMESTAMP_BITS = 40; // 40bits 記錄訪問時間戳(保證 34 年不溢出)
    private static final int FREQUENCY_BITS = 24; // 24bits 做爲對數計數器(能夠忽略計數溢出的狀況)

    private final long ERA = System.currentTimeMillis();  // 起始時間(記錄相對於該值的時間戳)
    private final double LOG_FACTOR = 1;                  // 對數因子
    private final TimeUnit DECAY_UNIT = TimeUnit.MINUTES; // 時間衰減單位

    /**
     * 更新 LFU 計數器與訪問時間
     * 與 redis 不一樣,更新時不會對計數進行衰減
     */
    public void updateTTL(CacheEntry node) {

        final long now = System.currentTimeMillis();

        long old = node.ttl();
        long timestamp = old >>> FREQUENCY_BITS;
        long frequency = old & (~0L >>> TIMESTAMP_BITS);

        // 計算訪問時間
        long elapsed = Math.min(~0L >>> FREQUENCY_BITS, now - ERA);
        while (timestamp < elapsed) {
            // 增長訪問計數
            double rand = ThreadLocalRandom.current().nextDouble();
            if (1./(frequency * LOG_FACTOR + 1) > rand) {
                frequency++;
                frequency &= (~0L >>> TIMESTAMP_BITS);
            }
            // 更新 TTL
            long ttl = elapsed << FREQUENCY_BITS | frequency & (~0L >>> TIMESTAMP_BITS);
            if (node.casTTL(old, ttl)) {
                break;
            }
            old = node.ttl();
            timestamp = old >>> FREQUENCY_BITS;
            frequency = old & (~0L >>> TIMESTAMP_BITS);
        }
    }

    /**
     * 返回衰減後的 LFU 計數
     */
    public long weightTTL(CacheEntry node, long now) {
        long ttl = node.ttl();
        long timestamp = ttl >>> FREQUENCY_BITS;
        long frequency = ttl & (~0L >>> TIMESTAMP_BITS);
        long decay = DECAY_UNIT.toMinutes(Math.max(now - ERA, timestamp) - timestamp);
        return frequency - decay;
    }

}

至此,對 redis 的淘汰策略分析完畢,後續將對 redis 的一些其餘細節進行分享,感謝觀看。


參考資料

相關文章
相關標籤/搜索