通過前兩篇的介紹,咱們對整個redis的動做流程已經有比較清晰的認識。node
接下來就是到具體的命令處理方式的理解了,想來咱們用這些工具的意義也是在此。雖然沒有人以爲,一個set/get方法會有難度,可是咱們畢竟不是很清楚,不然也不至於在談到深處就懵逼了。redis
我以爲本文的一個重要意義就是: 讓set/get還原成它原本樣子,和寫"hello world"同樣簡單。算法
框架性質的東西,咱們前面已經講解,就直接進入主題: set/get 的操做。sql
set/get 對應的兩個處理函數 (redisCommand) 定義是這樣的: 數據庫
// rF 表明 getCommand 是隻讀命令,又快又準,時間複雜度 O(1)或者O(log(n)) // wm 表明 setCommand 是個寫命令,小心空間問題 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0} {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}
因此,咱們只要理解了, setCommand,getCommand 以後,就能夠徹底自信的說,set/get 就是和 "hello world" 同樣簡單了。緩存
零、hash 算法 app
很顯然,kv型的存儲必定是hash相關算法的實現。那麼redis中如何使用這個hash算法的呢?框架
redis 中許多不一樣場景的hash算法,其原型是在 dictType 中定義的。dom
typedef struct dictType { // hash 算法原型 unsigned int (*hashFunction)(const void *key); void *(*keyDup)(void *privdata, const void *key); void *(*valDup)(void *privdata, const void *obj); int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); } dictType;
針對大部分場景,咱們的key通常都是 string 類型的,可是仍是會稍微有不同的。這裏咱們就兩個場景來講明下:異步
1. 命令集構建的hash算法
便是 server.commands 中的key的hash算法,這裏元素是有限的。其定義以下:
/* Command table. sds string -> command struct pointer. */ dictType commandTableDictType = { // 即 dictSdsCaseHash 是 command 的hash算法實現 dictSdsCaseHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCaseCompare, /* key compare */ dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; // server.c, 不區分大小寫的key hash unsigned int dictSdsCaseHash(const void *key) { return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key)); } // dict.c, 不區分大小寫的key-hash算法: hash * 33 + c /* And a case insensitive hash function (based on djb hash) */ unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) { // static uint32_t dict_hash_function_seed = 5381; unsigned int hash = (unsigned int)dict_hash_function_seed; while (len--) hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */ return hash; }
2. 針對普通kv查詢的hash算法
整個nosql就是kv的增刪改查,因此這是個重要的算法。
/* Db->dict, keys are sds strings, vals are Redis objects. */ dictType dbDictType = { // k 的hash算法實現 dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ dictSdsDestructor, /* key destructor */ dictObjectDestructor /* val destructor */ }; // server.c, 調用 dict 的實現 unsigned int dictSdsHash(const void *key) { return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); } // dict.c, 數據key的hash算法,或者通用的 string 的hashCode 算法 /* MurmurHash2, by Austin Appleby * Note - This code makes a few assumptions about how your machine behaves - * 1. We can read a 4-byte value from any address without crashing * 2. sizeof(int) == 4 * * And it has a few limitations - * * 1. It will not work incrementally. * 2. It will not produce the same results on little-endian and big-endian * machines. */ unsigned int dictGenHashFunction(const void *key, int len) { /* 'm' and 'r' are mixing constants generated offline. They're not really 'magic', they just happen to work well. */ uint32_t seed = dict_hash_function_seed; const uint32_t m = 0x5bd1e995; const int r = 24; /* Initialize the hash to a 'random' value */ uint32_t h = seed ^ len; /* Mix 4 bytes at a time into the hash */ const unsigned char *data = (const unsigned char *)key; // 核心算法: step1. *m, ^k>>r, *m, *m, ^k, 每4位作一次運算 while(len >= 4) { uint32_t k = *(uint32_t*)data; k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; data += 4; len -= 4; } /* Handle the last few bytes of the input array */ // step2. 倒數第三位 ^<<16, 第二位 ^<<8, 第一位 ^, 而後 *m switch(len) { case 3: h ^= data[2] << 16; case 2: h ^= data[1] << 8; case 1: h ^= data[0]; h *= m; }; /* Do a few final mixes of the hash to ensure the last few * bytes are well-incorporated. */ // step3. 再混合 ^>>13, *m, ^>>15 h ^= h >> 13; h *= m; h ^= h >> 15; return (unsigned int)h; }
能夠看到,針對普通的字符串的hash但是要複雜許多呢,由於這裏數據遠比 command 的數據多,狀況更復雜,這樣的算法惟一的目標就是儘可能避免hash衝突。(雖然不知道爲啥這麼幹,但它就是牛逼)
redis中還有其餘的hash算法,好比dictObjHash,dictEncObjHash, 後續有接觸咱們再聊。
接下來,咱們正式來看看 set/get 到底如何?
1、getCommand 解析
很顯然,get 會是個最簡單的命令,天然要檢軟柿子捏了。
// t_string.c void getCommand(client *c) { getGenericCommand(c); } int getGenericCommand(client *c) { robj *o; // 若是在kv裏找不到,則直接響應空,shared.nullbulk 做爲全局常量的優點就體現出來了 // shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n")); if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return C_OK; // 找到對應的數據,可是類型不匹配,說明不能使用 get 命令,響應錯誤信息 // shared.wrongtypeerr = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n" if (o->type != OBJ_STRING) { addReply(c,shared.wrongtypeerr); return C_ERR; } else { // 正常狀況則直接響應結果便可 addReplyBulk(c,o); return C_OK; } }
整個處理流程果真是異常簡單,感受人生已經達到了巔峯!可是,咱們尚未看到關鍵,那就是查找 key 的過程。咱們經過以前的介紹,知道有個叫作 redisDb 的東西,看起來它是負責全部的數據管理。它應該不會由於簡單而不存儲某些數據吧。
// db.c, 查找某個key對應的元素或者直接響應客戶端 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { // 使用 c->db 對應的數據庫進行查詢,因此要求客戶端必須針對某db進行操做,且不能跨庫操做是原理決定 robj *o = lookupKeyRead(c->db, key); // 若是沒有查到數據就直接使用默認的 reply, 響應客戶端了 if (!o) addReply(c,reply); return o; } // db.c, 讀取key 對應值 robj *lookupKeyRead(redisDb *db, robj *key) { robj *val; // 檢查過時狀況,若是過時,則不用再查了 if (expireIfNeeded(db,key) == 1) { /* Key expired. If we are in the context of a master, expireIfNeeded() * returns 0 only when the key does not exist at all, so it's save * to return NULL ASAP. */ if (server.masterhost == NULL) return NULL; /* However if we are in the context of a slave, expireIfNeeded() will * not really try to expire the key, it only returns information * about the "logical" status of the key: key expiring is up to the * master in order to have a consistent view of master's data set. * * However, if the command caller is not the master, and as additional * safety measure, the command invoked is a read-only command, we can * safely return NULL here, and provide a more consistent behavior * to clients accessign expired values in a read-only fashion, that * will say the key as non exisitng. * * Notably this covers GETs when slaves are used to scale reads. */ if (server.current_client && server.current_client != server.master && server.current_client->cmd && server.current_client->cmd->flags & CMD_READONLY) { return NULL; } } // 而後從db中查找對應的key值,其實就是一個 hash 查找 // 緩存命中統計 val = lookupKey(db,key); if (val == NULL) server.stat_keyspace_misses++; else server.stat_keyspace_hits++; return val; } // db.c, 過時的處理有點複雜,咱們稍後再看,先看 db 的查找key過程 robj *lookupKey(redisDb *db, robj *key) { // 直接在 db->dict 中進行hash查找便可,前面已經介紹完成,關鍵優化點在增量rehash dictEntry *de = dictFind(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) val->lru = LRU_CLOCK(); return val; } else { return NULL; } } // db.c, 接下來看下,檢查過時狀況 int expireIfNeeded(redisDb *db, robj *key) { mstime_t when = getExpire(db,key); mstime_t now; if (when < 0) return 0; /* No expire for this key */ /* Don't expire anything while loading. It will be done later. */ if (server.loading) return 0; /* If we are in the context of a Lua script, we claim that time is * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ now = server.lua_caller ? server.lua_time_start : mstime(); /* If we are running in the context of a slave, return ASAP: * the slave key expiration is controlled by the master that will * send us synthesized DEL operations for expired keys. * * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ if (server.masterhost != NULL) return now > when; /* Return when this key has not expired */ // 若是還沒到期就直接返回 if (now <= when) return 0; /* Delete the key */ server.stat_expiredkeys++; // key過時,是一個寫動做,須要傳播到 AOF 或者 slaves... propagateExpire(db,key,server.lazyfree_lazy_expire); // pub/sub 監控通知 notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); // 同步刪除或者異步刪除, 稍後討論 return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); } /* Return the expire time of the specified key, or -1 if no expire * is associated with this key (i.e. the key is non volatile) */ long long getExpire(redisDb *db, robj *key) { dictEntry *de; /* No expire? return ASAP */ // 查找過時隊列, 數據量小 if (dictSize(db->expires) == 0 || (de = dictFind(db->expires,key->ptr)) == NULL) return -1; /* The entry was found in the expire dict, this means it should also * be present in the main dict (safety check). */ serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL); // 返回到期時間戳, union 的應用 return dictGetSignedIntegerVal(de); } // 刪除過時數據key的兩種方式,同步+異步 // db.c, 同步刪除, 刪除 expires 隊列和 dict 數據 /* Delete a key, value, and associated expiration entry if any, from the DB */ int dbSyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); if (dictDelete(db->dict,key->ptr) == DICT_OK) { if (server.cluster_enabled) slotToKeyDel(key); return 1; } else { return 0; } } // lazyfree.c, 異步刪除過時數據, 一看就很複雜 /* Delete a key, value, and associated expiration entry if any, from the DB. * If there are enough allocations to free the value object may be put into * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictFind(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); // 判斷刪除的數據的影響範圍,與 數據類型有關,string爲1,hash/set則計算count,list計算length size_t free_effort = lazyfreeGetFreeEffort(val); /* If releasing the object is too much work, let's put it into the * lazy free list. */ if (free_effort > LAZYFREE_THRESHOLD) { // 將相關的數據放入隊列中,後臺任務慢慢刪除 atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); // 自身則當即設置爲 NULL dictSetVal(db->dict,de,NULL); } } /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (dictDelete(db->dict,key->ptr) == DICT_OK) { if (server.cluster_enabled) slotToKeyDel(key); return 1; } else { return 0; } }
怎麼樣?是否是有一首歌叫涼涼~
能夠說,get操做自己是至關簡單的,在無hash衝突前提下,O(1)的複雜度搞定。然而它還要處理過時的數據問題,就不那麼簡單了。
咱們用一個時序圖總體體會下get的流程:
2、setCommand
setCommand 是個寫操做,就不是 get 那麼簡單了。
// t_string.c, set 的全部用法都統一 setCommand, 多個參數共同解析爲 flags /* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */ void setCommand(client *c) { int j; robj *expire = NULL; int unit = UNIT_SECONDS; int flags = OBJ_SET_NO_FLAGS; for (j = 3; j < c->argc; j++) { char *a = c->argv[j]->ptr; robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; // NX 與 XX 互斥 if ((a[0] == 'n' || a[0] == 'N') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_XX)) { flags |= OBJ_SET_NX; } else if ((a[0] == 'x' || a[0] == 'X') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_NX)) { flags |= OBJ_SET_XX; } // PX 與 EX 互斥 else if ((a[0] == 'e' || a[0] == 'E') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_PX) && next) { flags |= OBJ_SET_EX; unit = UNIT_SECONDS; expire = next; j++; } else if ((a[0] == 'p' || a[0] == 'P') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && !(flags & OBJ_SET_EX) && next) { flags |= OBJ_SET_PX; unit = UNIT_MILLISECONDS; expire = next; j++; } else { addReply(c,shared.syntaxerr); return; } } // 嘗試壓縮 value 值以節省空間 (原始命令: set key value) c->argv[2] = tryObjectEncoding(c->argv[2]); setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); } // object.c, 壓縮字符串 /* Try to encode a string object in order to save space */ robj *tryObjectEncoding(robj *o) { long value; sds s = o->ptr; size_t len; /* Make sure this is a string object, the only type we encode * in this function. Other types use encoded memory efficient * representations but are handled by the commands implementing * the type. */ serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); /* We try some specialized encoding only for objects that are * RAW or EMBSTR encoded, in other words objects that are still * in represented by an actually array of chars. */ if (!sdsEncodedObject(o)) return o; /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ if (o->refcount > 1) return o; /* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 21 chars is not * representable as a 32 nor 64 bit integer. */ len = sdslen(s); // 針對小於21個字符串的字符,嘗試轉爲 long 型 if (len <= 21 && string2l(s,len,&value)) { /* This object is encodable as a long. Try to use a shared object. * Note that we avoid using shared integers when maxmemory is used * because every object needs to have a private LRU field for the LRU * algorithm to work well. */ if ((server.maxmemory == 0 || (server.maxmemory_policy != MAXMEMORY_VOLATILE_LRU && server.maxmemory_policy != MAXMEMORY_ALLKEYS_LRU)) && value >= 0 && value < OBJ_SHARED_INTEGERS) { decrRefCount(o); incrRefCount(shared.integers[value]); return shared.integers[value]; } else { if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr); o->encoding = OBJ_ENCODING_INT; o->ptr = (void*) value; return o; } } /* If the string is small and is still RAW encoded, * try the EMBSTR encoding which is more efficient. * In this representation the object and the SDS string are allocated * in the same chunk of memory to save space and cache misses. */ // 44 if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) { robj *emb; if (o->encoding == OBJ_ENCODING_EMBSTR) return o; // 使用 EMBSTR 編碼轉換,實際就是同一個 s 返回 emb = createEmbeddedStringObject(s,sdslen(s)); decrRefCount(o); return emb; } /* We can't encode the object... * * Do the last try, and at least optimize the SDS string inside * the string object to require little space, in case there * is more than 10% of free space at the end of the SDS string. * * We do that only for relatively large strings as this branch * is only entered if the length of the string is greater than * OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */ if (o->encoding == OBJ_ENCODING_RAW && sdsavail(s) > len/10) { o->ptr = sdsRemoveFreeSpace(o->ptr); } /* Return the original object. */ return o; } // sds.c, 去除無用空間佔用 /* Reallocate the sds string so that it has no free space at the end. The * contained string remains not altered, but next concatenation operations * will require a reallocation. * * After the call, the passed sds string is no longer valid and all the * references must be substituted with the new pointer returned by the call. */ sds sdsRemoveFreeSpace(sds s) { void *sh, *newsh; // s[-1] 指針不越界, 它是在新建一個 sds 對象時,在該指針前一位寫入的值,肯定sds類型 char type, oldtype = s[-1] & SDS_TYPE_MASK; int hdrlen; size_t len = sdslen(s); // sdsHdrSize: sds頭部大小 sh = (char*)s-sdsHdrSize(oldtype); type = sdsReqType(len); hdrlen = sdsHdrSize(type); if (oldtype==type) { newsh = s_realloc(sh, hdrlen+len+1); if (newsh == NULL) return NULL; s = (char*)newsh+hdrlen; } else { newsh = s_malloc(hdrlen+len+1); if (newsh == NULL) return NULL; memcpy((char*)newsh+hdrlen, s, len+1); s_free(sh); s = (char*)newsh+hdrlen; s[-1] = type; sdssetlen(s, len); } sdssetalloc(s, len); return s; } // t_string.c, expire 爲超時時間設置 void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { // 解析 expire 到 milliseconds 中 if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK) return; if (milliseconds <= 0) { addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name); return; } if (unit == UNIT_SECONDS) milliseconds *= 1000; } // 語法限制檢測, NX 要求不存在, XX 要求存在 if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) || (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL)) { addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } // 切實存儲 kv setKey(c->db,key,val); server.dirty++; // 設置超時 if (expire) setExpire(c->db,key,mstime()+milliseconds); // 通知pub/sub變動 notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); // 通知expire事件 if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id); addReply(c, ok_reply ? ok_reply : shared.ok); } // object.c, int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) { long long value; if (getLongLongFromObject(o, &value) != C_OK) { if (msg != NULL) { addReplyError(c,(char*)msg); } else { addReplyError(c,"value is not an integer or out of range"); } return C_ERR; } *target = value; return C_OK; } // object.c, int getLongLongFromObject(robj *o, long long *target) { long long value; if (o == NULL) { value = 0; } else { serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); // 將字符串轉換爲 long 型,獲得超時時間 if (sdsEncodedObject(o)) { if (strict_strtoll(o->ptr,&value) == C_ERR) return C_ERR; } else if (o->encoding == OBJ_ENCODING_INT) { value = (long)o->ptr; } else { serverPanic("Unknown string encoding"); } } if (target) *target = value; return C_OK; }
看完了超時及各標識位的解析,及set框架流程,咱們來看下具體核心的kv存儲: setKey(), setExpire();
// db.c, set kv /* High level Set operation. This function can be used in order to set * a key, whatever it was existing or not, to a new object. * * 1) The ref count of the value object is incremented. * 2) clients WATCHing for the destination key notified. * 3) The expire time of the key is reset (the key is made persistent). */ void setKey(redisDb *db, robj *key, robj *val) { // 先查找,再更新 if (lookupKeyWrite(db,key) == NULL) { // 新增 kv dbAdd(db,key,val); } else { // 覆蓋 kv dbOverwrite(db,key,val); } // 增長 value 的引用計數 incrRefCount(val); // 新增的元素,移出過時隊列 removeExpire(db,key); signalModifiedKey(db,key); } robj *lookupKeyWrite(redisDb *db, robj *key) { // 先嚐試過時處理,再查找db (hash 查找 db->dic) expireIfNeeded(db,key); return lookupKey(db,key); } // db.c, 添加kv /* Add the key to the DB. It's up to the caller to increment the reference * counter of the value if needed. * * The program is aborted if the key already exists. */ void dbAdd(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(key->ptr); // 添加到 db->dict 中 int retval = dictAdd(db->dict, copy, val); serverAssertWithInfo(NULL,key,retval == C_OK); // list 類型的數據,進行特殊處理(阻塞) if (val->type == OBJ_LIST) signalListAsReady(db, key); // 集羣添加 if (server.cluster_enabled) slotToKeyAdd(key); } // db.c /* Slot to Key API. This is used by Redis Cluster in order to obtain in * a fast way a key that belongs to a specified hash slot. This is useful * while rehashing the cluster. */ void slotToKeyAdd(robj *key) { // hash 定位 slot, 下面咱們簡單看下該算法 unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr)); sds sdskey = sdsdup(key->ptr); // 添加key 到 server.cluster->slots_to_keys 的 跳錶中 zslInsert(server.cluster->slots_to_keys,hashslot,sdskey); } // cluster.c, slot 定位算法, 其實就是 crc16算法與上 0x3FFF,該算法決定了 slot 最多隻能有 16383 個 /* We have 16384 hash slots. The hash slot of a given key is obtained * as the least significant 14 bits of the crc16 of the key. * * However if the key contains the {...} pattern, only the part between * { and } is hashed. This may be useful in the future to force certain * keys to be in the same node (assuming no resharding is in progress). */ unsigned int keyHashSlot(char *key, int keylen) { int s, e; /* start-end indexes of { and } */ for (s = 0; s < keylen; s++) if (key[s] == '{') break; /* No '{' ? Hash the whole key. This is the base case. */ if (s == keylen) return crc16(key,keylen) & 0x3FFF; /* '{' found? Check if we have the corresponding '}'. */ for (e = s+1; e < keylen; e++) if (key[e] == '}') break; /* No '}' or nothing betweeen {} ? Hash the whole key. */ if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF; /* If we are here there is both a { and a } on its right. Hash * what is in the middle between { and }. */ return crc16(key+s+1,e-s-1) & 0x3FFF; } // db.c, 設置key的超時標識 void setExpire(redisDb *db, robj *key, long long when) { dictEntry *kde, *de; /* Reuse the sds from the main dict in the expire dict */ kde = dictFind(db->dict,key->ptr); serverAssertWithInfo(NULL,key,kde != NULL); // 將須要超時檢測的 key 添加到 db->expires 隊列中 de = dictReplaceRaw(db->expires,dictGetKey(kde)); // 設置超時時間爲 when dictSetSignedIntegerVal(de,when); } // dict.h #define dictSetSignedIntegerVal(entry, _val_) \ do { entry->v.s64 = _val_; } while(0)
整體來講,set操做會分爲幾步:
1. 判斷出多重參數,如是不是NX/EX/PX/XX, 是否超時設置;
2. 編碼轉換數據, 如將字符串轉換爲long型;
3. 解析超時字段;
4. set kv, 添加或者覆蓋數據庫值, 同時清理過時隊列;
5. 設置超時時間;
6. 觸發事件監聽;
7. 響應客戶端;
最後,咱們以set的整個時序圖做爲結尾,也讓咱們明白一點,不是每一個hello world 都很簡單: