Redis技術交流羣 481804090html
Redis:https://github.com/zwjlpeng/Redis_Deep_Readgit
Redis中採用兩種算法進行內存回收,引用計數算法以及LRU算法,在操做系統內存管理一節中,咱們都學習過LRU算法(最近最久未使用算法),那麼什麼是LRU算法呢github
LRU算法做爲內存管理的一種有效算法,其含義是在內存有限的狀況下,當內存容量不足時,爲了保證程序的運行,這時就不得不淘汰內存中的一些對象,釋放這些對象佔用的空間,那麼選擇淘汰哪些對象呢?LRU算法就提供了一種策略,告訴咱們選擇最近一段時間內,最久未使用的對象將其淘汰,至於爲何要選擇最久未使用的,能夠想一想,最近一段時間內使用的東西,咱們是否是可能一會又要用到呢~,而很長一段時間內都沒有使用過的東西,也許永遠都不會再使用~redis
在操做系統中LRU算法淘汰的不是內存中的對象,而是頁,當內存中數據不足時,經過LRU算法,選擇一頁(通常是4KB)將其交換到虛擬內存區(Swap區)算法
LRU算法演示數據結構
這張圖應該畫的還行吧,用的是www.draw.io,解釋以下,假設前提,只有三塊內存空間可使用,每一塊內存空間只能存放一個對象,如A、B、C...app
一、最開始時,內存空間是空的,所以依次進入A、B、C是沒有問題的less
二、當加入D時,就出現了問題,內存空間不夠了,所以根據LRU算法,內存空間中A待的時間最爲久遠,選擇A,將其淘汰dom
三、當再次引用B時,內存空間中的B又處於活躍狀態,而C則變成了內存空間中,近段時間最久未使用的函數
四、當再次向內存空間加入E時,這時內存空間又不足了,選擇在內存空間中待的最久的C將其淘汰出內存,這時的內存空間存放的對象就是E->B->D
LRU算法的總體思路就是這樣的
算法實現應該採用怎樣的數據結構
隊列?那不就是FIFO算法嘛~,LRU算法最爲精典的實現,就是HashMap+Double LinkedList,時間複雜度爲O(1),具體能夠參考相關代碼
REDIS中LRU算法的實際應用,在Redis 1.0中並未引入LRU算法,只是簡單的使用引用計數法,去掉內存中再也不引用的對象以及運行一個定時任務serverCron去掉內存中已通過期的對象佔用的內存空間,如下是Redis 1.0中CT任務的釋放內存中的部份代碼
//去掉一些過時的KEYS for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; int num = dictSize(db->expires);//計算hash表中過時Key的數目 if (num) { time_t now = time(NULL); //#define REDIS_EXPIRELOOKUPS_PER_CRON 100 if (num > REDIS_EXPIRELOOKUPS_PER_CRON) num = REDIS_EXPIRELOOKUPS_PER_CRON; //循環100次,從過時Hash表中隨機挑選出100個Key,判斷Key是否過時,若是過時了,執行刪除操做 while (num--) { dictEntry *de; time_t t; //隨機獲取Key值(db->expires裏面存儲的均是即將過時的Keys) if ((de = dictGetRandomKey(db->expires)) == NULL) break; t = (time_t) dictGetEntryVal(de); if (now > t) { //不只要從存放過時keys的Hash表中刪除數據,還要從存放實際數據的Hash表中刪除數據 deleteKey(db,dictGetEntryKey(de)); } } } }
若是沒有看過Redis 1.0源碼,理解起來可能有些困難,但看看1.0源碼中的這個結構體,估計有點數據結構基礎的人,都明白上面這幾行代碼的意思了(註釋部份我也已經寫的很清楚了)~
typedef struct redisDb { dict *dict;//用來存放實際Key->Value數據的位置 dict *expires;//用於記錄Key的過時時間 int id;//表示選擇的是第幾個redis庫 } redisDb;
沒有查證是從什麼版本開始,Redis增長了LRU算法,如下是分析Redis 2.9.11代碼中的LRU算法淘汰策略,在2.9.11版本中與LRU算法相關的代碼主要位於object.c以及redis.c兩個源文件中, 再分析這兩個文件關於LRU源代碼以前,讓我們先看一下,Redis 2.9.11版本中關於LRU算法的配置,配置文件在redis.conf文件中,以下所示
# maxmemory <bytes> # MAXMEMORY POLICY: how Redis will select what to remove when maxmemory # is reached. You can select among five behaviors: # # volatile-lru -> remove the key with an expire set using an LRU algorithm # allkeys-lru -> remove any key accordingly to the LRU algorithm # volatile-random -> remove a random key with an expire set # allkeys-random -> remove a random key, any key # volatile-ttl -> remove the key with the nearest expire time (minor TTL) # noeviction -> don't expire at all, just return an error on write operations # # Note: with any of the above policies, Redis will return an error on write # operations, when there are not suitable keys for eviction. # # At the date of writing this commands are: set setnx setex append # incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd # sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby # zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby # getset mset msetnx exec sort # # The default is: # # maxmemory-policy noeviction # LRU and minimal TTL algorithms are not precise algorithms but approximated # algorithms (in order to save memory), so you can tune it for speed or # accuracy. For default Redis will check five keys and pick the one that was # used less recently, you can change the sample size using the following # configuration directive. # # The default of 5 produces good enough results. 10 Approximates very closely # true LRU but costs a bit more CPU. 3 is very fast but not very accurate. # # maxmemory-samples 5
從上面的配置中,能夠看出,高版本的Redis中當內存達到極限時,內存淘汰策略主要採用了6種方式進行內存對象的釋放操做
1.volatile-lru:從設置了過時時間的數據集中,選擇最近最久未使用的數據釋放
2.allkeys-lru:從數據集中(包括設置過時時間以及未設置過時時間的數據集中),選擇最近最久未使用的數據釋放
3.volatile-random:從設置了過時時間的數據集中,隨機選擇一個數據進行釋放
4.allkeys-random:從數據集中(包括了設置過時時間以及未設置過時時間)隨機選擇一個數據進行入釋放
5.volatile-ttl:從設置了過時時間的數據集中,選擇立刻就要過時的數據進行釋放操做
6.noeviction:不刪除任意數據(但redis還會根據引用計數器進行釋放呦~),這時若是內存不夠時,會直接返回錯誤
默認的內存策略是noeviction,在Redis中LRU算法是一個近似算法,默認狀況下,Redis隨機挑選5個鍵,而且從中選取一個最近最久未使用的key進行淘汰,在配置文件中能夠經過maxmemory-samples的值來設置redis須要檢查key的個數,可是栓查的越多,耗費的時間也就越久,可是結構越精確(也就是Redis從內存中淘汰的對象未使用的時間也就越久~),設置多少,綜合權衡吧~~~
在redis.h中聲明的redisObj定義的以下:
#define REDIS_LRU_BITS 24 #define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */ #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ typedef struct redisObject {
//存放的對象類型 unsigned type:4; //內容編碼 unsigned encoding:4; //與server.lruclock的時間差值 unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */\ //引用計數算法使用的引用計數器 int refcount; //數據指針 void *ptr; } robj;
從redisObject結構體的定義中能夠看出,在Redis中存放的對象不只會有一個引用計數器,還會存在一個server.lruclock,這個變量會在定時器中每次刷新時,調用getLRUClock獲取當前系統的毫秒數,做爲LRU時鐘數,該計數器總共佔用24位,最大能夠表示的值爲24個1即((1<<REDIS_LRU_BITS) - 1)=2^24 - 1,單位是毫秒,你能夠算一下這麼多毫秒,能夠表示多少年~~
server.lruclock在redis.c中運行的定時器中進行更新操做,代碼以下(redis.c中的定時器被配置中100ms執行一次)
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ..... run_with_period(100) trackOperationsPerSecond(); /* We have just REDIS_LRU_BITS bits per object for LRU information. * So we use an (eventually wrapping) LRU clock. * * Note that even if the counter wraps it's not a big problem, * everything will still work but some object will appear younger * to Redis. However for this to happen a given object should never be * touched for all the time needed to the counter to wrap, which is * not likely. * * Note that you can change the resolution altering the * REDIS_LRU_CLOCK_RESOLUTION define. */ server.lruclock = getLRUClock(); .... return 1000/server.hz; }
看到這,再看看Redis中建立對象時,如何對redisObj中的unsigned lru進行賦值操做的,代碼位於object.c中,以下所示
robj *createObject(int type, void *ptr) { robj *o = zmalloc(sizeof(*o)); o->type = type; o->encoding = REDIS_ENCODING_RAW; o->ptr = ptr; o->refcount = 1; //很關鍵的一步,Redis中建立的每個對象,都記錄下該對象的LRU時鐘 /* Set the LRU to the current lruclock (minutes resolution). */ o->lru = LRU_CLOCK(); return o; }
該代碼中最爲關鍵的一句就是o->lru=LRU_CLOCK(),這是一個定義,看一下這個宏定義的實現,代碼以下所示
#define LRU_CLOCK() ((1000/server.hz <= REDIS_LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock())
其中REDIS_LRU_CLOCK_RESOLUTION爲1000,能夠自已在配置文件中進行配置,表示的是LRU算法的精度,在這裏咱們就能夠看到server.lruclock的用處了,若是定時器執行的頻率高於LRU算法的精度時,能夠直接將server.lruclock直接在對象建立時賦值過去,避免了函數調用的內存開銷以及時間開銷~
有了上述的基礎,下面就是最爲關鍵的部份了,REDIS中LRU算法,這裏以volatile-lru爲例(選擇有過時時間的數據集進行淘汰),在Redis中命令的處理時,會調用processCommand函數,在ProcessCommand函數中,當在配置文件中配置了maxmemory時,會調用freeMemoryIfNeeded函數,釋放不用的內存空間
如下是freeMemoryIfNeeded函數的關於LRU相關部份的源代碼,其餘代碼相似
//不一樣的策略,操做的數據集不一樣 if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU || server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) { dict = server.db[j].dict; } else {//操做的是設置了過時時間的key集 dict = server.db[j].expires; } if (dictSize(dict) == 0) continue; /* volatile-random and allkeys-random policy */ //隨機選擇進行淘汰 if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); } /* volatile-lru and allkeys-lru policy */ //具體的LRU算法 else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU || server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) { struct evictionPoolEntry *pool = db->eviction_pool; while(bestkey == NULL) { //選擇隨機樣式,並從樣本中做用LRU算法選擇須要淘汰的數據 evictionPoolPopulate(dict, db->dict, db->eviction_pool); /* Go backward from best to worst element to evict. */ for (k = REDIS_EVICTION_POOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; de = dictFind(dict,pool[k].key); sdsfree(pool[k].key); //將pool+k+1以後的元素向前平移一個單位 memmove(pool+k,pool+k+1, sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1)); /* Clear the element on the right which is empty * since we shifted one position to the left. */ pool[REDIS_EVICTION_POOL_SIZE-1].key = NULL; pool[REDIS_EVICTION_POOL_SIZE-1].idle = 0; //選擇了須要淘汰的數據 if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... */ continue; } } } }
看了上面的代碼,也許你還在奇怪,說好的,LRU算法去哪去了呢,再看看這個函數evictionPoolPopulate的實現吧
#define EVICTION_SAMPLES_ARRAY_SIZE 16 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; //EVICTION_SAMPLES_ARRAY_SIZE最大樣本數,默認16 dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE]; dictEntry **samples; //若是咱們在配置文件中配置的samples小於16,則直接使用EVICTION_SAMPLES_ARRAY_SIZE if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) { samples = _samples; } else { samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples); } #if 1 /* Use bulk get by default. */ //從樣本集中隨機獲取server.maxmemory_samples個數據,存放在 count = dictGetRandomKeys(sampledict,samples,server.maxmemory_samples); #else count = server.maxmemory_samples; for (j = 0; j < count; j++) samples[j] = dictGetRandomKey(sampledict); #endif for (j = 0; j < count; j++) { unsigned long long idle; sds key; robj *o; dictEntry *de; de = samples[j]; key = dictGetKey(de); if (sampledict != keydict) de = dictFind(keydict, key); o = dictGetVal(de); //計算LRU時間 idle = estimateObjectIdleTime(o); k = 0; //選擇de在pool中的正確位置,按升序進行排序,升序的依據是其idle時間 while (k < REDIS_EVICTION_POOL_SIZE && pool[k].key && pool[k].idle < idle) k++; if (k == 0 && pool[REDIS_EVICTION_POOL_SIZE-1].key != NULL) { /* Can't insert if the element is < the worst element we have * and there are no empty buckets. */ continue; } else if (k < REDIS_EVICTION_POOL_SIZE && pool[k].key == NULL) { /* Inserting into empty position. No setup needed before insert. */ } else { //移動元素,memmove,還有空間能夠插入新元素 if (pool[REDIS_EVICTION_POOL_SIZE-1].key == NULL) { memmove(pool+k+1,pool+k, sizeof(pool[0])*(REDIS_EVICTION_POOL_SIZE-k-1)); } else {//已經沒有空間插入新元素時,將第一個元素刪除 /* No free space on right? Insert at k-1 */ k--; /* Shift all elements on the left of k (included) to the * left, so we discard the element with smaller idle time. */ //如下操做突出了第K個位置 sdsfree(pool[0].key); memmove(pool,pool+1,sizeof(pool[0])*k); } } //在第K個位置插入 pool[k].key = sdsdup(key); pool[k].idle = idle; } //執行到此以後,pool中存放的就是按idle time升序排序 if (samples != _samples) zfree(samples); }
看了上面的代碼,LRU時鐘的計算並無包括在內,那麼在看一下LRU算法的時鐘計算代碼吧,LRU時鐘計算代碼在object.c中的estimateObjectIdleTime這個函數中,代碼以下~~
//精略估計LRU時間 unsigned long long estimateObjectIdleTime(robj *o) { unsigned long long lruclock = LRU_CLOCK(); if (lruclock >= o->lru) { return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION; } else {//這種狀況通常不會發生,發生時證實redis中鍵的保存時間已經wrap了 return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) * REDIS_LRU_CLOCK_RESOLUTION; } }
好了,先到此吧~~~