Codis是基於proxy架構的redis集羣方案,如圖1所示,即客戶端的請求會先發送到proxy,由proxy作sharding後轉發到後端redis實例。這個sharding的規則(常稱之爲路由表、轉發表、slot表等)保存在集中化的組件(好比zookeeper、文件系統等)上,而後由Dashboard統一配置到全部Proxy上。相比而言,redis本身的集羣方案redis-cluster則是無中心化的架構,如圖2所示,它沒有集中化的控制組件和proxy,客戶端能夠向集羣內的任意一臺節點發送請求,而後根據節點的返回值作重定向(MOVE或ASK)操做,客戶端本地也會緩存slot表,並根據每次的重定向信息來更新這個表。因爲沒有中心化組件存儲或配置路由表,所以redis-cluster使用gossip在集羣間同步路由表和集羣拓補信息,在通過一段時間時候,理想狀況下集羣中每一個節點都掌握了整個集羣的路由信息。node
圖1 Codis架構圖redis
圖2 redis-cluster算法
對nosql數據庫而言,水平擴縮容(Scale in/out)是一項基本的能力。Scale in/out是指能夠動態的添加或刪除集羣中的節點,來水平擴展或收縮集羣容量和CPU算力,它和縱向擴縮容(Scale up/down)是相對的。因爲nosql是沒有schema的,通常都是簡單的kv結構(或者是kkv結構),所以作Scale in/out仍是相對而言比較容易的。由於key是按照slot爲單位進行sharding的(常見公式有:crc16(key) % slot_num,如圖3 ),所以只要將一個實例上的某些slots遷移到其它節點上,再把路由表(即slot和node的映射關係)更新便可。雖然Codis和redis-cluster都支持這種slot遷移的Scale in/out,可是他們的實現方式仍是有諸多區別的,接下來本文會闡述它們的不一樣。sql
圖3 key-slot-node映射關係數據庫
將一個redis上指定slot下的全部key遷移到其餘redis上並不麻煩。其實只要兩步,第一步先獲取這個slot下全部key,而後對每一個key發送遷移命令便可。因爲redis自己沒有slot的概念,更不維護key與slot的映射關係,所以第一步是須要改造redis引擎,使其能夠維護key與slot的映射關係,這一點redis-cluster和Codis都是這麼作的(好比使用一個單獨的dict數組來維護這種索引關係,每一個數組的下標就是slot num,每一個數組元素是一個dick,裏面存放的是<key、crc> pair)。第二步發送就比較簡單了,redis原生支持對一些key進行遷移的命令:MIGRATE,以下:後端
MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1 key2 ... keyN
redis-cluster的確就是直接使用MIGRATE 命令進行key的遷移,可是這個命令是同步阻塞的,鑑於redis單線程的特性,當MIGRATE耗時過久(好比網絡較慢、遷移bigkey)時會致使主線程沒法處理用戶請求,從而致使用戶RT變大甚至超時。所以,直接使用MIGRATE命令雖然方便,可是有諸多限制。Codis本身修改了redis引擎,加入了slots同步遷移和異步遷移的功能(同步遷移比較簡單,本文再也不贅述)。數組
所以,要想作到平滑的、用戶基本無感的scale in/out,slot遷移須要解決如下幾個難點:緩存
圖4 redis-cluster slot遷移網絡
如圖4所述,redis-cluster爲了支持slot遷移,改造引擎加入了key和slot的映射關係。redis-cluster使用rax樹來維護這個關係,所以在新建集羣、集羣擴縮容的時候,都會涉及到slot分配、刪除等操做,這些操做主要經過如下命令實現:數據結構
一旦映射關係創建好,接下來就能夠執行key相關的slot命令,redis-cluster提供瞭如下幾個命令:
redis-cluster在遷移一個slot的時候具體流程以下:
若是中途想取消一個遷移,能夠向節點發送 cluster setslot <slot> stable 取消對槽 slot 的導入(import)或者遷移(migrate)狀態。
因爲migrate命令是同步阻塞的(同步發送並同步接收),遷移過程會阻塞該引擎上的全部key的讀寫,只有在遷移響應成功以後纔會將本地key刪除,所以遷移是原子的。
由於MIGRATE命令是同步阻塞的,所以不會存在一個key正在被遷移又同時被讀寫的狀況,可是因爲一個slot下可能有部分key被遷移完成,部分key正在等待遷移的狀況,爲此若是讀寫的一個key所屬的slot正在被遷移,redis-cluster作以下處理:
redis-cluster讓redis集羣化,Scale能力拓展了分佈式的靈活性。可是也給redis帶來了一些限制,其實這些限制也是其餘redis集羣方案基本都有的。好比,因爲redis追求簡單、高性能,並不支持跨節點(分佈式)事務,所以一些涉及到可能跨節點的操做都將被限制,主要有:
和redis-cluster不一樣,codis的redis上不會維護slot表信息,每一個redis都默認本身負責1024個slot,slot表是維護在Dashboard並被Proxy感知的,這一點算是Codis的架構一個較大的特色。
Codis只提供了一個key相關的slot命令:slotshashkey [key1 key2...] , 獲取key所對應的hashslot。
具體流程可見圖5。
圖5 codis slot遷移流程
因爲codis使用異步遷移slotsmgrttagslot-async命令,所以沒法像redis-cluster那樣利用MIGRATE命令同步阻塞的特性保證key遷移的原子性。爲此,Codis作了如下手段來保證key的原子性:
和redis-cluster同步遷移不一樣,Codis因爲使用異步遷移,所以一個正處於遷移狀態的key(即key已經被髮送或者被部分發送,尚未獲得最終響應)是可能被用戶繼續讀寫的,爲此除了像redis-cluster那樣要考慮遷移中的slot,Codis還須要考慮遷移中的key的讀寫衝突處理。
對於一個讀寫請求,若是key所在的slot正在被遷移 ,proxy會使用slotsmgrt-exec-wrapper $hashkey $command [$arg1 ...] 命令對原始請求進行包裝一下再發送給redis,若是原始命令是讀操做則能夠正常響應,若是是寫操做則redis返回TRYAGIN錯誤,由Proxy進行重試。若是key已經遷移走,則引擎返回MOVED錯誤,Proxy須要更新路由表,具體過程如圖6所示。
圖6 codis對遷移中的key的讀寫處理
本文將詳細描述同步遷移和異步遷移的實現原理。
圖7所示的就是同步遷移的流程,源端會將key進行序列化,而後使用socket將數據發送到目標redis(其實就是調用restore命令),目標redis收到restore命令後會對key進行反序列化,存儲到DB以後回覆ACK,源端redis收到ACK以後就將本地的key刪除。能夠看到,整個過程,源端redis都是阻塞的,若是遷移的key是一個bigkey,會致使源端序列化、網絡傳輸、目標端反序列化、源端同步刪除很是耗時,因爲redis的單線程特性,時間循環(eventloop)沒法及時處理用戶的讀寫事件,從而致使用戶RT增高甚至超時。
圖7 同步遷移流程
因爲redis支持list、set、zset、hash等複合數據結構,所以會有bigkey的問題。圖8所示的就是MIGRATE命令實現原理,在MIGRATE中,所謂的序列化其實就是將key對應的value進行RDB格式化,在目標端redis按照RDB格式進行加載。若是list、set、zset、hash成員不少(好比幾千個甚至幾萬個),那麼RDB格式化和加載就會很是耗時。
圖8 MIGRATE命令原理
既然同步遷移會阻塞主線程,那麼很容易想到的解決方案就是使用一個獨立線程作遷移,如圖9所示。因爲多線程會設計到對共享數據(好比DB)的訪問,所以須要加同步原語,這對redis單線程、幾乎無鎖的架構而言,改動起來是比較複雜的。
圖9 獨立線程實現異步遷移
另外一種異步遷移實現思路,是依然採用單線程模型,即對象的序列化(在源redis端)和反序列化(在目標redis端)依然會阻塞主線程,可是和MIGRATE同步遷移不一樣,異步遷移不會同步等待restore的返回,restore完成以後目標端redis會向源端redis發送一個restore-ack命令(相似於回調機制)來通知源端redis遷移的狀態。所以這樣大大的減小了源端redis遷移的阻塞時間,可讓事件循環(eventloop)儘快的處理下一個就緒事件。
因爲這種方案依然依賴於主線程作序列化和反序列化,所以,爲了進一步下降序列化和反序列化的耗時,Codis使用拆分指令(chunked)的方式對bigkey作遷移處理。如圖10所示,對於一個list而言,假設其包含很是多的elem,若是一次性將其所有序列化則很是耗時,若是將其等價拆分紅一條條RPUSH指令,則每一條指令則很是的輕量。
圖10 指令拆分
使用指令拆分以後,本來一個key只須要一條restore命令的遷移,如今變成不少條,所以爲了保證遷移的原子性(即不會存在一些elem遷移成功,一些elem遷移失敗),Codis會在每個拆分指令中加上一個臨時TTL,因爲只有所有前已成功纔會刪除本地的key,所以即便中途遷移失敗,已遷移成功的elem也會超時自動刪除,最終效果就比如遷移沒有發生同樣。elem所有遷移成功以後,Codis會再單獨發送一個修正TTL的命令並刪除本地的key。
圖11 臨時TTL
異步遷移的第一步,就是先發一條DEL命令刪除目標redis上的key,如圖12所示。
圖12 第一步先刪除目標key
如圖13所示,接下來收到目標redis的ACK以後會繼續發送後續的拆分指令,每次發送的拆分指令的個數是能夠參數控制的。
圖13 臨時TTL
全部的拆分指令所有發送完成以後,會再發一個修成TTL的指令,最後刪除本地的key。
圖14 遷移完成刪除本地的key
並非全部的key都會採用chunked的方式遷移,對於string對象、小對象依然能夠直接使用RDB格式序列化,只有對於大對象(bigkey)纔會觸發chunked方式遷移。
圖15 針對不一樣對象使用不一樣遷移方式
前文主要論述了redis-cluster同步遷移和Codis異步遷移的異同和原理,redis-cluster同步遷移能夠參考redis源碼中cluster.c中關於migrateCommand和restoreCommand實現,源碼仍是很是簡單的。Codis的slot遷移提供了同步和異步兩種,同步遷移的代碼在slots.c中,其代碼和redis原生的migrateCommand基本一致,所以二者觀其一便可。異步遷移代碼在slots_async.c中,這塊的原創性就比較高了,因爲原做者對代碼基本沒有加註釋,所以爲了便於理解,我在閱讀源碼的時候簡單的加了一些中文註釋,就貼在這裏吧。原理如前文所述,想看實現的能夠看下面的代碼,我就不一一拆分解釋了,由於太多了。。。
#include "server.h" /* ============================ Worker Thread for Lazy Release ============================= */ typedef struct { pthread_t thread;/* lazy工做線程 */ pthread_mutex_t mutex;/* 互斥信號量 */ pthread_cond_t cond;/* 條件變量 */ list *objs; /* 要被lazy釋放的對象鏈表 */ } lazyReleaseWorker; /* lazy釋放主線程 */ static void * lazyReleaseWorkerMain(void *args) { lazyReleaseWorker *p = args; while (1) { /* 等待在條件變量上,條件爲待釋放對象鏈表長度爲0 */ pthread_mutex_lock(&p->mutex); while (listLength(p->objs) == 0) { pthread_cond_wait(&p->cond, &p->mutex); } /* 取出鏈表的第一個節點 */ listNode *head = listFirst(p->objs); /* 節點值爲要釋放的對象 */ robj *o = listNodeValue(head); /* 從鏈表中刪除這個節點 */ listDelNode(p->objs, head); pthread_mutex_unlock(&p->mutex); /* 釋放對象 */ decrRefCount(o); } return NULL; } /* lazy釋放一個對象 */ static void lazyReleaseObject(robj *o) { /* 對象當前的refcount必須已經爲1,即已經沒有任何人引用這個對象 */ serverAssert(o->refcount == 1); /* 獲取lazyReleaseWorker */ lazyReleaseWorker *p = server.slotsmgrt_lazy_release; /* 上鎖 */ pthread_mutex_lock(&p->mutex); if (listLength(p->objs) == 0) { /* 若是待釋放隊列長度爲0,則喚醒釋放線程 */ pthread_cond_broadcast(&p->cond); } /* 將待釋放對象加入釋放鏈表 */ listAddNodeTail(p->objs, o); /* 解鎖 */ pthread_mutex_unlock(&p->mutex); } /* 建立lazy釋放工做線程 */ static lazyReleaseWorker * createLazyReleaseWorkerThread() { lazyReleaseWorker *p = zmalloc(sizeof(lazyReleaseWorker)); pthread_mutex_init(&p->mutex, NULL); pthread_cond_init(&p->cond, NULL); p->objs = listCreate(); /* 建立線程 */ if (pthread_create(&p->thread, NULL, lazyReleaseWorkerMain, p) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Worker Thread for Lazy Release Jobs."); exit(1); } return p; } /* 初始化Lazy釋放工做線程 */ void slotsmgrtInitLazyReleaseWorkerThread() { server.slotsmgrt_lazy_release = createLazyReleaseWorkerThread(); } /* ============================ Iterator for Data Migration ================================ */ #define STAGE_PREPARE 0 #define STAGE_PAYLOAD 1 #define STAGE_CHUNKED 2 #define STAGE_FILLTTL 3 #define STAGE_DONE 4 /* 單對象迭代器 */ typedef struct { int stage; robj *key;/* 單對象對應的的key */ robj *val;/* 單對象對應的的值 */ long long expire;/* 該對象對應的過時設置 */ unsigned long cursor;/* 遊標,用於dictScan */ unsigned long lindex;/* 索引,listTypeInitIterator時用到 */ unsigned long zindex;/* 索引,遍歷zset時用到 */ unsigned long chunked_msgs;/* 該對象chunked消息個數 */ } singleObjectIterator; /* 建立單對象迭代 */ static singleObjectIterator * createSingleObjectIterator(robj *key) { /* 分配空間 */ singleObjectIterator *it = zmalloc(sizeof(singleObjectIterator)); /* 初始化階段 */ it->stage = STAGE_PREPARE; /* 設置key */ it->key = key; /* 引用計數 */ incrRefCount(it->key); it->val = NULL; it->expire = 0; it->cursor = 0; it->lindex = 0; it->zindex = 0; it->chunked_msgs = 0; return it; } /* 釋放SingleObjectIterator */ static void freeSingleObjectIterator(singleObjectIterator *it) { if (it->val != NULL) { /* 對val解引用 */ decrRefCount(it->val); } /* 對key解引用 */ decrRefCount(it->key); /* 釋放結構 */ zfree(it); } static void freeSingleObjectIteratorVoid(void *it) { freeSingleObjectIterator(it); } /* 判斷單個對象是否還有下一個階段須要處理 */ static int singleObjectIteratorHasNext(singleObjectIterator *it) { /* 只要狀態不是STAGE_DONE就還須要繼續處理 */ return it->stage != STAGE_DONE; } /* 若是是sds編碼的字符串對象就返回sds底層字符換的長度,不然返回默認長度len */ static size_t sdslenOrElse(robj *o, size_t len) { return sdsEncodedObject(o) ? sdslen(o->ptr) : len; } /* 若是val類型爲dict時執行dictScan操做的回調 */ static void singleObjectIteratorScanCallback(void *data, const dictEntry *de) { /* 提取privdata {ll, val, &len}*/ void **pd = (void **)data; list *l = pd[0];/* 鏈表,用於存放scan出來的元素 */ robj *o = pd[1];/* 被迭代的對象值val */ long long *n = pd[2];/* 返回字節數的指針 */ robj *objs[2] = {NULL, NULL}; switch (o->type) { case OBJ_HASH: /* 若是原對象是hash,則分別將hash的key和value按順序方式鏈表 */ objs[0] = dictGetKey(de); objs[1] = dictGetVal(de); break; case OBJ_SET: /* 若是原對象是set,則只將hash的key放入鏈表 */ objs[0] = dictGetKey(de); break; } /* 將掃出來的對象添加到鏈表 */ for (int i = 0; i < 2; i ++) { if (objs[i] != NULL) { /* 引用計數 */ incrRefCount(objs[i]); /* 這個對象的大小,對於string對象就是string長度,其餘對象就按8字節算 */ *n += sdslenOrElse(objs[i], 8); listAddNodeTail(l, objs[i]); } } } /* 將double轉爲內存二進制表示 */ static uint64_t convertDoubleToRawBits(double value) { union { double d; uint64_t u; } fp; fp.d = value; return fp.u; } /* 將內存二進制表示轉爲double值 */ static double convertRawBitsToDouble(uint64_t value) { union { double d; uint64_t u; } fp; fp.u = value; return fp.d; } /* 從Uint64建立RawString對象 */ static robj * createRawStringObjectFromUint64(uint64_t v) { uint64_t p = intrev64ifbe(v); return createRawStringObject((char *)&p, sizeof(p)); } /* 從RawString獲取Uint64 */ static int getUint64FromRawStringObject(robj *o, uint64_t *p) { if (sdsEncodedObject(o) && sdslen(o->ptr) == sizeof(uint64_t)) { *p = intrev64ifbe(*(uint64_t *)(o->ptr)); return C_OK; } return C_ERR; } /* 計算一個對象須要的restore命令的個數,單個restore上只能攜帶maxbulks個Bulk Bulk:$6\r\nfoobar\r\n Multi-bulk :"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n" */ static long numberOfRestoreCommandsFromObject(robj *val, long long maxbulks) { long long numbulks = 0; switch (val->type) { case OBJ_LIST: if (val->encoding == OBJ_ENCODING_QUICKLIST) { /* list的長度就是須要的Bulk的數目 */ numbulks = listTypeLength(val); } break; case OBJ_HASH: if (val->encoding == OBJ_ENCODING_HT) { /* hash表中每一個元素須要2個Bulk */ numbulks = hashTypeLength(val) * 2; } break; case OBJ_SET: if (val->encoding == OBJ_ENCODING_HT) { /* set中每一個元素須要1個Bulk */ numbulks = setTypeSize(val); } break; case OBJ_ZSET: if (val->encoding == OBJ_ENCODING_SKIPLIST) { /* zset中每一個元素須要2個Bulk */ numbulks = zsetLength(val) * 2; } break; } /* 若是實際的numbulks比要求的maxbulks小,則使用一條restore命令 */ if (numbulks <= maxbulks) { return 1; } /* 計算須要的restore命令個數 */ return (numbulks + maxbulks - 1) / maxbulks; } /* 估計Restore命令的個數 */ static long estimateNumberOfRestoreCommands(redisDb *db, robj *key, long long maxbulks) { /* 查找key對應的val */ robj *val = lookupKeyWrite(db, key); if (val != NULL) { return numberOfRestoreCommandsFromObject(val, maxbulks); } return 0; } extern void createDumpPayload(rio *payload, robj *o); extern zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank); static slotsmgrtAsyncClient *getSlotsmgrtAsyncClient(int db); /* 單對象迭代,返回值爲命令個數(Bulks) */ static int singleObjectIteratorNext(client *c, singleObjectIterator *it, long long timeout, unsigned int maxbulks, unsigned int maxbytes) { /* * * STAGE_PREPARE ---> STAGE_PAYLOAD ---> STAGE_DONE * | A * V | * +------------> STAGE_CHUNKED ---> STAGE_FILLTTL * A | * | V * +-------+ * */ /* 本次迭代的key */ robj *key = it->key; /* 但對象遷移的準備階段 */ if (it->stage == STAGE_PREPARE) { /* 以寫的方式查找key,與lookupKeyRead區別是沒有命中率更新 */ robj *val = lookupKeyWrite(c->db, key); if (val == NULL) { /* 若是key沒有找到,則結束 */ it->stage = STAGE_DONE; return 0; } /* 設置值 */ it->val = val; /* 增長引用 */ incrRefCount(it->val); /* 設置過時時間 */ it->expire = getExpire(c->db, key); /* 前導消息 */ int leading_msgs = 0; /* 獲取db對應的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == c) { /* 只有slotsmgrtAsyncClient未被使用的時候 */ if (ac->used == 0) { /* 表示已經被使用 */ ac->used = 1; /* 若是須要驗證 */ if (server.requirepass != NULL) { /* SLOTSRESTORE-ASYNC-AUTH $password */ addReplyMultiBulkLen(c, 2); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-AUTH"); addReplyBulkCString(c, server.requirepass); leading_msgs += 1; } /* SELECT DB操做 */ do { /* SLOTSRESTORE-ASYNC-SELECT $db */ addReplyMultiBulkLen(c, 2); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-SELECT"); addReplyBulkLongLong(c, c->db->id); leading_msgs += 1; } while (0); } } /* SLOTSRESTORE-ASYNC delete $key */ addReplyMultiBulkLen(c, 3); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "delete"); addReplyBulk(c, key); /* 計算須要的restore命令個數,maxbulks表示一個restore命令可承載的bulk最大數目 */ long n = numberOfRestoreCommandsFromObject(val, maxbulks); if (n >= 2) { /* 若是須要2個及以上,則進入CHUNKED階段,即啓用分塊傳輸 */ it->stage = STAGE_CHUNKED; /* chunked消息個數 */ it->chunked_msgs = n; } else { /* 不然一個restore能夠承載,則直接進入PAYLOAD階段 */ it->stage = STAGE_PAYLOAD; it->chunked_msgs = 0; } /* 這裏的1爲delete命令,再加上其餘的前導命令(若是有),做爲命令個數返回 */ return 1 + leading_msgs; } /* 取出key對應的值 */ robj *val = it->val; long long ttl = 0; if (it->stage == STAGE_CHUNKED) { /* 若是是CHUNKED階段,則設置一個臨時ttl */ ttl = timeout * 3; } else if (it->expire != -1) { /* 不然若是val上有過時時間,則從新計算ttl */ ttl = it->expire - mstime(); if (ttl < 1) { ttl = 1; } } /* 當一個CHUNKED對象所有序列化完成以後會到這個階段 */ if (it->stage == STAGE_FILLTTL) { /* SLOTSRESTORE-ASYNC expire $key $ttl */ addReplyMultiBulkLen(c, 4); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "expire"); addReplyBulk(c, key); /* 設置真實的ttl */ addReplyBulkLongLong(c, ttl); /* 迭代結束 */ it->stage = STAGE_DONE; /* 該階段只有一個命令 */ return 1; } /* 若是是PAYLOAD階段切val類型不是OBJ_STRING */ if (it->stage == STAGE_PAYLOAD && val->type != OBJ_STRING) { /* 負載緩衝區 */ rio payload; /* 將val序列化爲RDB格式 */ createDumpPayload(&payload, val); /* SLOTSRESTORE-ASYNC object $key $ttl $payload */ addReplyMultiBulkLen(c, 5); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); /* 對象類型 */ addReplyBulkCString(c, "object"); addReplyBulk(c, key); addReplyBulkLongLong(c, ttl); /* 添加payload */ addReplyBulkSds(c, payload.io.buffer.ptr); /* 迭代結束 */ it->stage = STAGE_DONE; /* 該階段只有一個命令 */ return 1; } /* 若是是PAYLOAD階段切val類型爲OBJ_STRING */ if (it->stage == STAGE_PAYLOAD && val->type == OBJ_STRING) { /* SLOTSRESTORE-ASYNC string $key $ttl $payload */ addReplyMultiBulkLen(c, 5); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, "string"); addReplyBulk(c, key); addReplyBulkLongLong(c, ttl); addReplyBulk(c, val); /* 迭代結束 */ it->stage = STAGE_DONE; /* 該階段只有一個命令 */ return 1; } /* 若是是CHUNKED類型 */ if (it->stage == STAGE_CHUNKED) { const char *cmd = NULL; /* 根據val的類型使用不一樣的子命令 */ switch (val->type) { case OBJ_LIST: cmd = "list"; break; case OBJ_HASH: cmd = "hash"; break; case OBJ_SET: cmd = "dict"; break; case OBJ_ZSET: cmd = "zset"; break; default: serverPanic("unknown object type"); } /* 是否還有更多須要序列化 */ int more = 1; /* ll鏈表用於存放本次SLOTSRESTORE-ASYNC命令攜帶的args */ list *ll = listCreate(); /* 設置是否函數,本質就是調用decrRefCount */ listSetFreeMethod(ll, decrRefCountVoid); long long hint = 0, len = 0; if (val->type == OBJ_LIST) { /* 若是val類型爲OBJ_LIST,則建立list迭代 */ listTypeIterator *li = listTypeInitIterator(val, it->lindex, LIST_TAIL); do { /* 表示list每一項 */ listTypeEntry entry; /* 遍歷 */ if (listTypeNext(li, &entry)) { quicklistEntry *e = &(entry.entry); robj *obj; if (e->value) { /* */ obj = createStringObject((const char *)e->value, e->sz); } else { /* */ obj = createStringObjectFromLongLong(e->longval); } /* 累計字節數 */ len += sdslenOrElse(obj, 8); /* 添加到ll */ listAddNodeTail(ll, obj); /* 索引加1 */ it->lindex ++; } else { /* 沒有更多了 */ more = 0; } /* 當還有更多要發送且ll現有元素個數小於maxbulks且字節數小於 maxbytes */ } while (more && listLength(ll) < maxbulks && len < maxbytes); /* 釋放迭代器 */ listTypeReleaseIterator(li); /* 原list的總長度 */ hint = listTypeLength(val); } if (val->type == OBJ_HASH || val->type == OBJ_SET) { /* 控制循環次數 */ int loop = maxbulks * 10; /* 默認最大循環次數 */ if (loop < 100) { loop = 100; } dict *ht = val->ptr; void *pd[] = {ll, val, &len}; do { it->cursor = dictScan(ht, it->cursor, singleObjectIteratorScanCallback, pd); if (it->cursor == 0) { /* 沒有更多了 */ more = 0; } /* 若是還有更多且ll現有元素個數小於maxbulks且本次發送字節數小於maxbytes且loop不爲0 */ } while (more && listLength(ll) < maxbulks && len < maxbytes && (-- loop) >= 0); /* 原hash的總大小 */ hint = dictSize(ht); } if (val->type == OBJ_ZSET) { /* 若是是ZSET類型 */ zset *zs = val->ptr; dict *ht = zs->dict; long long rank = (long long)zsetLength(val) - it->zindex; zskiplistNode *node = (rank >= 1) ? zslGetElementByRank(zs->zsl, rank) : NULL; do { if (node != NULL) { robj *field = node->obj; incrRefCount(field); len += sdslenOrElse(field, 8); listAddNodeTail(ll, field); uint64_t bits = convertDoubleToRawBits(node->score); robj *score = createRawStringObjectFromUint64(bits); len += sdslenOrElse(score, 8); listAddNodeTail(ll, score); node = node->backward; it->zindex ++; } else { /* 沒有更多了 */ more = 0; } /* 若是還有更多元素且bulks沒有超過maxbulks且產生的字節數沒有超過maxbytes */ } while (more && listLength(ll) < maxbulks && len < maxbytes); /* 原hash總大小 */ hint = dictSize(ht); } /* SLOTSRESTORE-ASYNC list/hash/zset/dict $key $ttl $hint [$arg1 ...] */ addReplyMultiBulkLen(c, 5 + listLength(ll));/* MultiBulk總長度 */ addReplyBulkCString(c, "SLOTSRESTORE-ASYNC"); addReplyBulkCString(c, cmd);/* list?hash? */ addReplyBulk(c, key); addReplyBulkLongLong(c, ttl);/* ttl */ addReplyBulkLongLong(c, hint);/* 總大小 */ /* 遍歷ll,ll裏面存放了本地要發送的args */ while (listLength(ll) != 0) { /* 取出頭結點 */ listNode *head = listFirst(ll); /* 取出值對象 */ robj *obj = listNodeValue(head); /* 添加回復 */ addReplyBulk(c, obj); /* 刪除該節點 */ listDelNode(ll, head); } /* 釋放ll */ listRelease(ll); if (!more) { /* 若是對象全部元素都被序列換完畢,則進入FILLTTL階段 */ it->stage = STAGE_FILLTTL; } /* 該階段只有一個命令 */ return 1; } if (it->stage != STAGE_DONE) { serverPanic("invalid iterator stage"); } serverPanic("use of empty iterator"); } /* ============================ Iterator for Data Migration (batched) ====================== */ typedef struct { struct zskiplist *tags;/* 標識一個hashtag有沒有被添加過 */ dict *keys;/* 批處理的Keys */ list *list; /* 每一個節點的值都是singleObjectIterator */ dict *hash_slot;/* hash數組,數組的下標爲slot_num,每一個數組元素的字典爲key、crc對 */ struct zskiplist *hash_tags;/* 用於保存具備hashtag的key,score爲key的crc,值爲key */ long long timeout;/* 進程chunked restore時會指定臨時ttl,值爲timeout*3 */ unsigned int maxbulks;/* 單次restore最多發送多少個bulks */ unsigned int maxbytes;/* 單次發送最多發送多少字節數 */ list *removed_keys;/* 一個key被髮送完成以後會加入這個鏈表 */ list *chunked_vals;/* 用於存放使用chunked方式發生的val */ long estimate_msgs;/* 估算的restore命令的個數 */ } batchedObjectIterator; /* 建立batchedObjectIterator */ static batchedObjectIterator * createBatchedObjectIterator(dict *hash_slot, struct zskiplist *hash_tags, long long timeout, unsigned int maxbulks, unsigned int maxbytes) { batchedObjectIterator *it = zmalloc(sizeof(batchedObjectIterator)); it->tags = zslCreate(); it->keys = dictCreate(&setDictType, NULL); it->list = listCreate(); listSetFreeMethod(it->list, freeSingleObjectIteratorVoid); it->hash_slot = hash_slot; it->hash_tags = hash_tags; it->timeout = timeout; it->maxbulks = maxbulks; it->maxbytes = maxbytes; it->removed_keys = listCreate(); listSetFreeMethod(it->removed_keys, decrRefCountVoid); it->chunked_vals = listCreate(); listSetFreeMethod(it->chunked_vals, decrRefCountVoid); it->estimate_msgs = 0; return it; } /* 釋放BatchedObjectIterator */ static void freeBatchedObjectIterator(batchedObjectIterator *it) { zslFree(it->tags); dictRelease(it->keys); listRelease(it->list); listRelease(it->removed_keys); listRelease(it->chunked_vals); zfree(it); } /* 批處理迭代(即一次處理多個key) */ static int batchedObjectIteratorHasNext(batchedObjectIterator *it) { /* list鏈表不爲空,每一個節點的值都是singleObjectIterator */ while (listLength(it->list) != 0) { /* 每一個節點的值都是singleObjectIterator */ listNode *head = listFirst(it->list); /* 每一個節點的值都是singleObjectIterator */ singleObjectIterator *sp = listNodeValue(head); /* 判斷單個對象是否已經處於STAGE_DONE */ if (singleObjectIteratorHasNext(sp)) { /* 不處於STAGE_DONE,即單對象迭代還沒結束,則直接返回1,下次還會迭代這個對象 */ return 1; } /* 不然當前單對象已經迭代結束 */ if (sp->val != NULL) { /* 若是當前單對象的value不爲空,就把單對象的key添加到removed_keys鏈表 */ incrRefCount(sp->key); listAddNodeTail(it->removed_keys, sp->key); if (sp->chunked_msgs != 0) { /* 若是chunked的消息個數不爲0 */ incrRefCount(sp->val); /* 就把val加入到chunked_vals鏈表 */ listAddNodeTail(it->chunked_vals, sp->val); } } /* 刪除這個節點 */ listDelNode(it->list, head); } return 0; } /* 批處理對象迭代,返回值爲本地迭代產生的SLOTSRESTORE系列命令的個數 */ static int batchedObjectIteratorNext(client *c, batchedObjectIterator *it) { /* 遍歷鏈表 */ if (listLength(it->list) != 0) { /* 取出頭結點 */ listNode *head = listFirst(it->list); /* 節點值爲singleObjectIterator */ singleObjectIterator *sp = listNodeValue(head); /* maxbytes減去客戶端輸出緩衝區當前已有的大小就是本次能發送的最大字節數 */ long long maxbytes = (long long)it->maxbytes - getClientOutputBufferMemoryUsage(c); /* 單對象迭代,迭代超時timeout,迭代單詞最大maxbulks,單次最大maxbytes */ return singleObjectIteratorNext(c, sp, it->timeout, it->maxbulks, maxbytes > 0 ? maxbytes : 0); } serverPanic("use of empty iterator"); } /* 批處理裏面是否包含key,返回1表示存在,返回0表示不存在 */ static int batchedObjectIteratorContains(batchedObjectIterator *it, robj *key, int usetag) { /* 若是在keys中找到,則存在 */ if (dictFind(it->keys, key) != NULL) { return 1; } /* 若是沒有使用hashtag則結束查找 */ if (!usetag) { return 0; } uint32_t crc; int hastag; /* 計算key的crc和hashtag */ slots_num(key->ptr, &crc, &hastag); if (!hastag) { /* 若是key沒有hashtag則結束查找 */ return 0; } /* 不然填充range */ zrangespec range; range.min = (double)crc; range.minex = 0; range.max = (double)crc; range.maxex = 0; /* 以crc爲範圍在跳錶tags中查找,每個hashtag被添加都會在tags跳錶中添加一個節點 */ return zslFirstInRange(it->tags, &range) != NULL; } /* 向批處理添加一個key,返回值爲本次新添加的key的個數 */ static int batchedObjectIteratorAddKey(redisDb *db, batchedObjectIterator *it, robj *key) { /* 添加到keys字典 */ if (dictAdd(it->keys, key, NULL) != C_OK) { return 0; } /* 引用計數 */ incrRefCount(key); /* 建立createSingleObjectIterator */ listAddNodeTail(it->list, createSingleObjectIterator(key)); /* 對該對象須要的restore命令個數進行預估 */ it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks); /* 當前批處理的key個數 */ int size = dictSize(it->keys); uint32_t crc; int hastag; /* 該key對應的slot num */ slots_num(key->ptr, &crc, &hastag); if (!hastag) { /* 若是key不含有hashtag則跳出 */ goto out; } /* 知道score爲crc */ zrangespec range; range.min = (double)crc; range.minex = 0; range.max = (double)crc; range.maxex = 0; /* 尋找第一個score知足 range範圍的節點*/ if (zslFirstInRange(it->tags, &range) != NULL) { /* 找到則跳出,所以是該hashtag的key已經被添加過,無需重複添加 */ goto out; } /* 引用計數 */ incrRefCount(key); /* 沒找到則插入,score爲crc,節點的值爲key */ zslInsert(it->tags, (double)crc, key); /* 若是hash_tags跳錶指針爲NULL */ if (it->hash_tags == NULL) { goto out; } /* 在hash_tags中尋找score知足range範圍的第一個節點 */ zskiplistNode *node = zslFirstInRange(it->hash_tags, &range); /* 若是score不一樣就跳出 */ while (node != NULL && node->score == (double)crc) { /* 結點值就是key */ robj *key = node->obj; /* score相同的節點都是連續排列的,所以直接從level[0]向後遍歷就好 */ node = node->level[0].forward; /* 添加到批處理keys */ if (dictAdd(it->keys, key, NULL) != C_OK) { continue; } /* 引用計數 */ incrRefCount(key); /* 爲該key添加但對象迭代器SingleObjectIterator */ listAddNodeTail(it->list, createSingleObjectIterator(key)); /* 對該對象須要的restore命令個數進行預估 */ it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks); } out: /* 本次新加如的key的個數,注意最開始的1個key也要加上 */ return 1 + dictSize(it->keys) - size; } /* ============================ Clients ==================================================== */ /* 獲取異步遷移客戶端,每一個db一個 */ static slotsmgrtAsyncClient * getSlotsmgrtAsyncClient(int db) { return &server.slotsmgrt_cached_clients[db]; } /* 通知被阻塞的 SlotsmgrtAsyncClient */ static void notifySlotsmgrtAsyncClient(slotsmgrtAsyncClient *ac, const char *errmsg) { /* 獲取當前迭代器 */ batchedObjectIterator *it = ac->batched_iter; /* 獲取阻塞鏈表 */ list *ll = ac->blocked_list; /* 遍歷 */ while (listLength(ll) != 0) { /* 取出頭節點 */ listNode *head = listFirst(ll); /* 取出節點值,就是client */ client *c = listNodeValue(head); if (errmsg != NULL) { /* 錯誤信息不爲空,則將錯誤信息返回給client */ addReplyError(c, errmsg); } else if (it == NULL) { /* 迭代器非法 */ addReplyError(c, "invalid iterator (NULL)"); } else if (it->hash_slot == NULL) { addReplyLongLong(c, listLength(it->removed_keys)); } else { /* 返回兩個值,一個是本次moved一個是hash_slot如今的大小 */ addReplyMultiBulkLen(c, 2); addReplyLongLong(c, listLength(it->removed_keys)); addReplyLongLong(c, dictSize(it->hash_slot)); } /* 清除CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT標誌,表示這個客戶端不是一個正在被使用、正常服務的客戶端 */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 清空客戶端阻塞鏈表 */ c->slotsmgrt_fenceq = NULL; /* 刪除當前節點 */ listDelNode(ll, head); } } /* 釋放slotsmgrtAsyncClient裏面的結構 */ static void unlinkSlotsmgrtAsyncCachedClient(client *c, const char *errmsg) { slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); /* 必須有CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT標誌,表示這是一個已經被cached的客戶端 */ serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT); serverAssert(ac->c == c); /* 通知被阻塞的客戶端,消息爲errmsg */ notifySlotsmgrtAsyncClient(ac, errmsg); batchedObjectIterator *it = ac->batched_iter; /* 空閒時間 */ long long elapsed = mstime() - ac->lastuse; serverLog(LL_WARNING, "slotsmgrt_async: unlink client %s:%d (DB=%d): " "sending_msgs = %ld, batched_iter = %ld, blocked_list = %ld, " "timeout = %lld(ms), elapsed = %lld(ms) (%s)", ac->host, ac->port, c->db->id, ac->sending_msgs, it != NULL ? (long)listLength(it->list) : -1, (long)listLength(ac->blocked_list), ac->timeout, elapsed, errmsg); sdsfree(ac->host); if (it != NULL) { /* 釋放批處理迭代器 */ freeBatchedObjectIterator(it); } /* 釋放阻塞鏈表 */ listRelease(ac->blocked_list); /* 取消CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示不是被緩存的slotsmgrtAsyncClient */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT; /* 狀況結構,以備下一次使用(注意不須要free ac,由於這是每一個db私有的) */ memset(ac, 0, sizeof(*ac)); } /* 釋放一個db相關的SlotsmgrtAsyncClient */ static int releaseSlotsmgrtAsyncClient(int db, const char *errmsg) { slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); if (ac->c == NULL) { /* 爲NULL無需釋放 */ return 0; } client *c = ac->c; /* 釋放slotsmgrtAsyncClient裏面的結構 */ unlinkSlotsmgrtAsyncCachedClient(c, errmsg); /* 釋放client結構 */ freeClient(c); return 1; } /* 新建一個slotsmgrtAsyncClient */ static int createSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) { /* 新建鏈接 */ int fd = anetTcpNonBlockConnect(server.neterr, host, port); if (fd == -1) { serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, %s", host, port, db, server.neterr); return C_ERR; } /* 禁用nagel算法 */ anetEnableTcpNoDelay(server.neterr, fd); int wait = 100; if (wait > timeout) { wait = timeout; } /* 等待可寫狀態 */ if ((aeWait(fd, AE_WRITABLE, wait) & AE_WRITABLE) == 0) { serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, io error or timeout (%d)", host, port, db, wait); close(fd); return C_ERR; } /* 建立redis客戶端,內部會將fd讀事件添加到主線程eventloop */ client *c = createClient(fd); if (c == NULL) { serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) failed, %s", host, port, db, server.neterr); return C_ERR; } /* 選擇客戶端綁定的db */ if (selectDb(c, db) != C_OK) { serverLog(LL_WARNING, "slotsmgrt_async: invalid DB index (DB=%d)", db); freeClient(c); return C_ERR; } /* 添加設置標誌CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示這是一個已經被CACHED的客戶端 */ c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT; /* 已認證 */ c->authenticated = 1; /* 釋放一個db相關的SlotsmgrtAsyncClient(清空裏面的成員結構) */ releaseSlotsmgrtAsyncClient(db, "interrupted: build new connection"); serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) OK", host, port, db); /* 根據db獲取slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); /* 設置綁定的client */ ac->c = c; /* 沒有被使用 */ ac->used = 0; /* ip */ ac->host = sdsnew(host); /* port */ ac->port = port; /* 空閒時間 */ ac->timeout = timeout; /* 更新最後一次使用時間 */ ac->lastuse = mstime(); /* 飛行中的消息計數 */ ac->sending_msgs = 0; /* 批處理迭代器 */ ac->batched_iter = NULL; /* 建立阻塞鏈表 */ ac->blocked_list = listCreate(); return C_OK; } /* 獲取或建立一個slotsmgrtAsyncClient */ static slotsmgrtAsyncClient * getOrCreateSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) { /* 根據要操做的db獲取緩存的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db); if (ac->c != NULL) { /* 不爲NULL,在比較下host和port,只有徹底線條才返回 */ if (ac->port == port && !strcmp(ac->host, host)) { return ac; } } /* 不然新建一個slotsmgrtAsyncClient */ return createSlotsmgrtAsyncClient(db, host, port, timeout) != C_OK ? NULL : ac; } static void unlinkSlotsmgrtAsyncNormalClient(client *c) { /* 釋放一個正在被使用的、正常的client */ serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT); /* 阻塞鏈表不能爲NULL */ serverAssert(c->slotsmgrt_fenceq != NULL); /* 該客戶端阻塞的鏈表 */ list *ll = c->slotsmgrt_fenceq; /* 在阻塞鏈表中搜索該客戶端 */ listNode *node = listSearchKey(ll, c); /* 必須能搜索到 */ serverAssert(node != NULL); /* 再也不是一個正在被使用的、正常的client */ c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 再也不阻塞也就沒有阻塞鏈表 */ c->slotsmgrt_fenceq = NULL; /* 從阻塞鏈表中刪除該客戶端 */ listDelNode(ll, node); } void slotsmgrtAsyncUnlinkClient(client *c) { /* 針對CACHED類型客戶端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT) { unlinkSlotsmgrtAsyncCachedClient(c, "interrupted: connection closed"); } /* 針對NORMAL類型客戶端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { unlinkSlotsmgrtAsyncNormalClient(c); } } /* 會被按期執行 */ void slotsmgrtAsyncCleanup() { /* 遍歷全部db */ for (int i = 0; i < server.dbnum; i ++) { /* 獲取每一個db對應的 slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(i); if (ac->c == NULL) { continue; } /* 計算空閒時間 */ long long elapsed = mstime() - ac->lastuse; /* 提取客戶端timeout */ long long timeout = ac->batched_iter != NULL ? ac->timeout : 1000LL * 60; if (elapsed <= timeout) { /* 若是空閒時間小於timeout則繼續遍歷 */ continue; } /* 不然就釋放這個客戶端 */ releaseSlotsmgrtAsyncClient(i, ac->batched_iter != NULL ? "interrupted: migration timeout" : "interrupted: idle timeout"); } } /* 獲取異步遷移狀態或者阻塞一個client */ static int getSlotsmgrtAsyncClientMigrationStatusOrBlock(client *c, robj *key, int block) { /* 獲取當前db上的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == NULL || ac->batched_iter == NULL) { /* 沒有遷移或遷移完成 */ return 0; } /* 獲取當前的batched_iter */ batchedObjectIterator *it = ac->batched_iter; if (key != NULL && !batchedObjectIteratorContains(it, key, 1)) { /* 若是key不爲NULL且key不在batched中則直接返回0,表示該key沒有遷移或者遷移完成 */ return 0; } if (!block) { /* 若是不容許阻塞則直接返回 */ return 1; } if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { /* 若是這個客戶端是一個CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT,便是一個 正在服務的slotsmgrtAsyncClient */ return -1; } /* 獲取阻塞鏈表 */ list *ll = ac->blocked_list; /* 設置CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT標誌,表示這是一個正常的被阻塞的客戶端 */ c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT; /* 設置客戶端阻塞在哪一個鏈表上 */ c->slotsmgrt_fenceq = ll; /* 添加到阻塞隊列 */ listAddNodeTail(ll, c); return 1; } /* ============================ Slotsmgrt{One,TagOne}AsyncDumpCommand ====================== */ /* SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */ /* SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */ static void slotsmgrtAsyncDumpGenericCommand(client *c, int usetag) { long long timeout; /* 獲取timeout */ if (getLongLongFromObject(c->argv[1], &timeout) != C_OK || !(timeout >= 0 && timeout <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of timeout (%s)", (char *)c->argv[1]->ptr); return; } /* 若是timeout爲0就修正爲30s */ if (timeout == 0) { timeout = 1000 * 30; } /* 獲取maxbulks */ long long maxbulks; if (getLongLongFromObject(c->argv[2], &maxbulks) != C_OK || !(maxbulks >= 0 && maxbulks <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbulks (%s)", (char *)c->argv[2]->ptr); return; } /* 若是maxbulks就修正爲默認值3000 */ if (maxbulks == 0) { maxbulks = 1000; } /* 建立批處理迭代器,若是使用hashtag則提供 tagged_keys */ batchedObjectIterator *it = createBatchedObjectIterator(NULL, usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, INT_MAX); /* 向批處理添加keys */ for (int i = 3; i < c->argc; i ++) { batchedObjectIteratorAddKey(c->db, it, c->argv[i]); } /* 添加一個空對象節點到復鏈表reply中,用於存放MultiBulk的長度 */ void *ptr = addDeferredMultiBulkLength(c); int total = 0; /* batched迭代 */ while (batchedObjectIteratorHasNext(it)) { /* batchedObjectIteratorNext返回本次迭代產生的SLOTSRESTORE系列命令的個數 */ total += batchedObjectIteratorNext(c, it); } /* 把真實的長度寫進去 */ setDeferredMultiBulkLength(c, ptr, total); /* 釋放批處理迭代器 */ freeBatchedObjectIterator(it); } /* * * SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] * */ void slotsmgrtOneAsyncDumpCommand(client *c) { if (c->argc <= 3) { addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC-DUMP"); return; } slotsmgrtAsyncDumpGenericCommand(c, 0); } /* * * SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] * */ void slotsmgrtTagOneAsyncDumpCommand(client *c) { if (c->argc <= 3) { addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC-DUMP"); return; } slotsmgrtAsyncDumpGenericCommand(c, 1); } /* ============================ Slotsmgrt{One,TagOne,Slot,TagSlot}AsyncCommand ============= */ /* 根據配置的client_obuf參數來修正maxbytes */ static unsigned int slotsmgrtAsyncMaxBufferLimit(unsigned int maxbytes) { clientBufferLimitsConfig *config = &server.client_obuf_limits[CLIENT_TYPE_NORMAL]; if (config->soft_limit_bytes != 0 && config->soft_limit_bytes < maxbytes) { /* 若是配置的大小比soft_limit_bytes大則使用soft_limit_bytes */ maxbytes = config->soft_limit_bytes; } if (config->hard_limit_bytes != 0 && config->hard_limit_bytes < maxbytes) { /* 若是配置的大小比hard_limit_bytes大則使用hard_limit_bytes */ maxbytes = config->hard_limit_bytes; } return maxbytes; } /* 在給定長時間usecs內至少產生atleast條消息(一條消息表明一條SLOTSRESTORE命令) */ static long slotsmgrtAsyncNextMessagesMicroseconds(slotsmgrtAsyncClient *ac, long atleast, long long usecs) { /* 批處理迭代 */ batchedObjectIterator *it = ac->batched_iter; /* 階段截止時間 */ long long deadline = ustime() + usecs; long msgs = 0; /* 若是批處理還有對象須要迭代切客戶端輸出緩衝區使用字節數小於maxbytes */ while (batchedObjectIteratorHasNext(it) && getClientOutputBufferMemoryUsage(ac->c) < it->maxbytes) { /* 批處理對象迭代,返回值爲本地迭代產生的SLOTSRESTORE系列命令的個數 */ if ((msgs += batchedObjectIteratorNext(ac->c, it)) < atleast) { continue; } /* 若是已經超時就返回 */ if (ustime() >= deadline) { return msgs; } } /* 返回消息的個數 */ return msgs; } /* hash_slot的掃描函數 */ static void slotsScanSdsKeyCallback(void *l, const dictEntry *de) { sds skey = dictGetKey(de); robj *key = createStringObject(skey, sdslen(skey)); /* 將key添加都鏈表 */ listAddNodeTail((list *)l, key); } /* SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */ /* SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */ /* SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */ /* SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */ static void slotsmgrtAsyncGenericCommand(client *c, int usetag, int usekey) { /* 提取host和port */ char *host = c->argv[1]->ptr; long long port; if (getLongLongFromObject(c->argv[2], &port) != C_OK || !(port >= 1 && port < 65536)) { addReplyErrorFormat(c, "invalid value of port (%s)", (char *)c->argv[2]->ptr); return; } /* 提取timeout,用於chunk遷移時的臨時ttl */ long long timeout; if (getLongLongFromObject(c->argv[3], &timeout) != C_OK || !(timeout >= 0 && timeout <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of timeout (%s)", (char *)c->argv[3]->ptr); return; } /* 默認30S */ if (timeout == 0) { timeout = 1000 * 30; } /* 提取maxbulks,用於以爲每一個chunk能鞋底的bulk數目 */ long long maxbulks; if (getLongLongFromObject(c->argv[4], &maxbulks) != C_OK || !(maxbulks >= 0 && maxbulks <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbulks (%s)", (char *)c->argv[4]->ptr); return; } if (maxbulks == 0) { maxbulks = 200; } /* 最大512K */ if (maxbulks > 512 * 1024) { maxbulks = 512 * 1024; } /* 提取 maxbytes,用於決定單詞遷移發送的最大字節數 */ long long maxbytes; if (getLongLongFromObject(c->argv[5], &maxbytes) != C_OK || !(maxbytes >= 0 && maxbytes <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of maxbytes (%s)", (char *)c->argv[5]->ptr); return; } if (maxbytes == 0) { maxbytes = 512 * 1024; } if (maxbytes > INT_MAX / 2) { maxbytes = INT_MAX / 2; } /* 根據客戶端配置的outbuf大小修正maxbytes */ maxbytes = slotsmgrtAsyncMaxBufferLimit(maxbytes); dict *hash_slot = NULL; long long numkeys = 0; if (!usekey) { /* 不是SLOTSMGRTTAGONE-ASYNC和SLOTSMGRTONE-ASYNC,即不指定key遷移 則提取slotnum */ long long slotnum; if (getLongLongFromObject(c->argv[6], &slotnum) != C_OK || !(slotnum >= 0 && slotnum < HASH_SLOTS_SIZE)) { addReplyErrorFormat(c, "invalid value of slot (%s)", (char *)c->argv[6]->ptr); return; } /* 獲取hash_slot字典 */ hash_slot = c->db->hash_slots[slotnum]; /* 提取numkeys */ if (getLongLongFromObject(c->argv[7], &numkeys) != C_OK || !(numkeys >= 0 && numkeys <= INT_MAX)) { addReplyErrorFormat(c, "invalid value of numkeys (%s)", (char *)c->argv[7]->ptr); return; } /* 若是numkeys爲0就默認爲每次遷移100 */ if (numkeys == 0) { numkeys = 100; } } /* DB是否正處於遷移狀態 */ if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) { addReplyError(c, "the specified DB is being migrated"); return; } /* 帶有CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT標誌的客戶端是一個被阻塞正在等待操做結束的客戶端 */ if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) { addReplyError(c, "previous operation has not finished"); return; } /* 獲取或建立一個slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getOrCreateSlotsmgrtAsyncClient(c->db->id, host, port, timeout); if (ac == NULL) { addReplyErrorFormat(c, "create client to %s:%d failed", host, (int)port); return; } /* 建立批處理迭代器 */ batchedObjectIterator *it = createBatchedObjectIterator(hash_slot, usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, maxbytes); if (!usekey) { /* 建立一個鏈表ll,用於存放從hash_slot掃描出來的數據 */ list *ll = listCreate(); listSetFreeMethod(ll, decrRefCountVoid); for (int i = 2; i >= 0 && it->estimate_msgs < numkeys; i --) { unsigned long cursor = 0; if (i != 0) { cursor = random(); } else { if (htNeedsResize(hash_slot)) { dictResize(hash_slot); } } if (dictIsRehashing(hash_slot)) { dictRehash(hash_slot, 50); } int loop = numkeys * 10; if (loop < 100) { loop = 100; } do { /* slotsScanSdsKeyCallback裏面會把掃描出來的key添加都ll中 */ cursor = dictScan(hash_slot, cursor, slotsScanSdsKeyCallback, ll); while (listLength(ll) != 0 && it->estimate_msgs < numkeys) { listNode *head = listFirst(ll); robj *key = listNodeValue(head); long msgs = estimateNumberOfRestoreCommands(c->db, key, it->maxbulks); if (it->estimate_msgs == 0 || it->estimate_msgs + msgs <= numkeys * 2) { batchedObjectIteratorAddKey(c->db, it, key); } listDelNode(ll, head); } /* */ } while (cursor != 0 && it->estimate_msgs < numkeys && dictSize(it->keys) < (unsigned long)numkeys && (-- loop) >= 0); } listRelease(ll); } else { /* 不然就是指定key的遷移 */ for (int i = 6; i < c->argc; i ++) { batchedObjectIteratorAddKey(c->db, it, c->argv[i]); } } /* 當前沒有正在發送的消息 */ serverAssert(ac->sending_msgs == 0); /* 客戶端阻塞鏈表也爲空 */ serverAssert(ac->batched_iter == NULL && listLength(ac->blocked_list) == 0); ac->timeout = timeout; /* 更新最後使用時間 */ ac->lastuse = mstime(); ac->batched_iter = it; /* 在500ms內至少產生3條命令 */ ac->sending_msgs = slotsmgrtAsyncNextMessagesMicroseconds(ac, 3, 500); /* 判斷db是否在遷移狀態,若是是則阻塞 */ getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1); if (ac->sending_msgs != 0) { return; } notifySlotsmgrtAsyncClient(ac, NULL); ac->batched_iter = NULL; freeBatchedObjectIterator(it); } /* * * SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] * */ void slotsmgrtOneAsyncCommand(client *c) { if (c->argc <= 6) { addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 0, 1); } /* * * SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] * */ void slotsmgrtTagOneAsyncCommand(client *c) { if (c->argc <= 6) { addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 1, 1); } /* * * SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys * */ void slotsmgrtSlotAsyncCommand(client *c) { if (c->argc != 8) { addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 0, 0); } /* * * SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys * */ void slotsmgrtTagSlotAsyncCommand(client *c) { if (c->argc != 8) { addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC"); return; } slotsmgrtAsyncGenericCommand(c, 1, 0); } /* * * SLOTSMGRT-ASYNC-FENCE * */ void slotsmgrtAsyncFenceCommand(client *c) { /* 獲取異步遷移狀態或者阻塞一個client */ int ret = getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1); if (ret == 0) { /* 沒有阻塞,說明當前沒有遷移任務 */ addReply(c, shared.ok); } else if (ret != 1) { /* 正常狀況下若是客戶端成功阻塞,會返回1 */ addReplyError(c, "previous operation has not finished (call fence again)"); } /* 返回1的狀況下,客戶端暫時不會受到任何返回,後續遷移完成後會收到最終通知 */ } /* * * SLOTSMGRT-ASYNC-CANCEL * */ void slotsmgrtAsyncCancelCommand(client *c) { addReplyLongLong(c, releaseSlotsmgrtAsyncClient(c->db->id, "interrupted: canceled")); } /* ============================ SlotsmgrtAsyncStatus ======================================= */ static void singleObjectIteratorStatus(client *c, singleObjectIterator *it) { if (it == NULL) { addReply(c, shared.nullmultibulk); return; } void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "key"); addReplyBulk(c, it->key); fields ++; addReplyBulkCString(c, "val.type"); addReplyBulkLongLong(c, it->val == NULL ? -1 : it->val->type); fields ++; addReplyBulkCString(c, "stage"); addReplyBulkLongLong(c, it->stage); fields ++; addReplyBulkCString(c, "expire"); addReplyBulkLongLong(c, it->expire); fields ++; addReplyBulkCString(c, "cursor"); addReplyBulkLongLong(c, it->cursor); fields ++; addReplyBulkCString(c, "lindex"); addReplyBulkLongLong(c, it->lindex); fields ++; addReplyBulkCString(c, "zindex"); addReplyBulkLongLong(c, it->zindex); fields ++; addReplyBulkCString(c, "chunked_msgs"); addReplyBulkLongLong(c, it->chunked_msgs); setDeferredMultiBulkLength(c, ptr, fields * 2); } /* batchedObjectIterator的狀態 */ static void batchedObjectIteratorStatus(client *c, batchedObjectIterator *it) { if (it == NULL) { addReply(c, shared.nullmultibulk); return; } void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "keys"); addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c, dictSize(it->keys)); addReplyMultiBulkLen(c, dictSize(it->keys)); dictIterator *di = dictGetIterator(it->keys); dictEntry *de; while((de = dictNext(di)) != NULL) { addReplyBulk(c, dictGetKey(de)); } dictReleaseIterator(di); fields ++; addReplyBulkCString(c, "timeout"); addReplyBulkLongLong(c, it->timeout); fields ++; addReplyBulkCString(c, "maxbulks"); addReplyBulkLongLong(c, it->maxbulks); fields ++; addReplyBulkCString(c, "maxbytes"); addReplyBulkLongLong(c, it->maxbytes); fields ++; addReplyBulkCString(c, "estimate_msgs"); addReplyBulkLongLong(c, it->estimate_msgs); fields ++; addReplyBulkCString(c, "removed_keys"); addReplyBulkLongLong(c, listLength(it->removed_keys)); fields ++; addReplyBulkCString(c, "chunked_vals"); addReplyBulkLongLong(c, listLength(it->chunked_vals)); fields ++; addReplyBulkCString(c, "iterators"); addReplyMultiBulkLen(c, 2); addReplyBulkLongLong(c, listLength(it->list)); singleObjectIterator *sp = NULL; if (listLength(it->list) != 0) { sp = listNodeValue(listFirst(it->list)); } singleObjectIteratorStatus(c, sp); setDeferredMultiBulkLength(c, ptr, fields * 2); } /* * * SLOTSMGRT-ASYNC-STATUS * */ void slotsmgrtAsyncStatusCommand(client *c) { /* */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c == NULL) { addReply(c, shared.nullmultibulk); return; } /* 預留MultiBulk長度 */ void *ptr = addDeferredMultiBulkLength(c); int fields = 0; fields ++; addReplyBulkCString(c, "host"); addReplyBulkCString(c, ac->host); fields ++; addReplyBulkCString(c, "port"); addReplyBulkLongLong(c, ac->port); fields ++; addReplyBulkCString(c, "used"); addReplyBulkLongLong(c, ac->used); fields ++; addReplyBulkCString(c, "timeout"); addReplyBulkLongLong(c, ac->timeout); fields ++; addReplyBulkCString(c, "lastuse"); addReplyBulkLongLong(c, ac->lastuse); fields ++; addReplyBulkCString(c, "since_lastuse"); addReplyBulkLongLong(c, mstime() - ac->lastuse); fields ++; addReplyBulkCString(c, "sending_msgs"); addReplyBulkLongLong(c, ac->sending_msgs); /* 被阻塞的客戶端的個數 */ fields ++; addReplyBulkCString(c, "blocked_clients"); addReplyBulkLongLong(c, listLength(ac->blocked_list)); fields ++; addReplyBulkCString(c, "batched_iterator"); batchedObjectIteratorStatus(c, ac->batched_iter); /* 設置MultiBulk長度 */ setDeferredMultiBulkLength(c, ptr, fields * 2); } /* ============================ SlotsmgrtExecWrapper ======================================= */ /* * * SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$arg1 ...] * */ void slotsmgrtExecWrapperCommand(client *c) { /* MultiBulk長度2 */ addReplyMultiBulkLen(c, 2); if (c->argc < 3) { addReplyLongLong(c, -1); addReplyError(c, "wrong number of arguments for SLOTSMGRT-EXEC-WRAPPER"); return; } /* 查找命令 */ struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr); if (cmd == NULL) { addReplyLongLong(c, -1); addReplyErrorFormat(c,"invalid command specified (%s)", (char *)c->argv[2]->ptr); return; } if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) { addReplyLongLong(c, -1); addReplyErrorFormat(c, "wrong number of arguments for command (%s)", (char *)c->argv[2]->ptr); return; } /* 寫的方式查找key */ if (lookupKeyWrite(c->db, c->argv[1]) == NULL) { addReplyLongLong(c, 0); addReplyError(c, "the specified key doesn't exist"); return; } /* 若是是寫命令且 c->argv[1]正處於遷移狀態,不會阻塞客戶端 */ if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) { /* 返回1 */ addReplyLongLong(c, 1); addReplyError(c, "the specified key is being migrated"); return; } else { /* 返回2表示正常 */ addReplyLongLong(c, 2); robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2)); for (int i = 2; i < c->argc; i ++) { argv[i - 2] = c->argv[i]; incrRefCount(c->argv[i]); } /* 被重複引用計數的要減去 */ for (int i = 0; i < c->argc; i ++) { decrRefCount(c->argv[i]); } zfree(c->argv); c->argc = c->argc - 2; c->argv = argv; c->cmd = cmd; /* 調用被包裝的命令 */ call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE); } } /* ============================ SlotsrestoreAsync Commands ================================= */ /* SLOTSRESTORE-ASYNC的回覆 */ static void slotsrestoreReplyAck(client *c, int err_code, const char *fmt, ...) { va_list ap; va_start(ap, fmt); sds s = sdscatvprintf(sdsempty(), fmt, ap); va_end(ap); addReplyMultiBulkLen(c, 3); addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-ACK"); addReplyBulkLongLong(c, err_code); addReplyBulkSds(c, s); if (err_code != 0) { /* 若是有錯誤則回覆以後關閉客戶端 */ c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } extern int verifyDumpPayload(unsigned char *p, size_t len); /* slotsrestore-async命令具體處理 */ static int slotsrestoreAsyncHandle(client *c) { /* 獲取本節點上異步遷移狀態,即便在遷移也不會阻塞這個client */ if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) { /* 本節點當前db上正在執行遷移,不能響應slotsrestore-async命令 */ slotsrestoreReplyAck(c, -1, "the specified DB is being migrated"); return C_ERR; } const char *cmd = ""; /* 參數校驗 */ if (c->argc < 2) { goto bad_arguments_number; } cmd = c->argv[1]->ptr; /* ==================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key [$ttl $arg1, $arg2 ...] */ /* ==================================================== */ if (c->argc < 3) { goto bad_arguments_number; } robj *key = c->argv[2]; /* SLOTSRESTORE-ASYNC delete $key */ if (!strcasecmp(cmd, "delete")) { if (c->argc != 3) { goto bad_arguments_number; } /* 同步刪除 */ int deleted = dbDelete(c->db, key); if (deleted) { /* 刪除成功,通知全部watch該key的client */ signalModifiedKey(c->db, key); /* 髒計數 */ server.dirty ++; } /* 回覆,成刪除回覆1,沒有刪除則返回0 */ slotsrestoreReplyAck(c, 0, deleted ? "1" : "0"); return C_OK; } /* ==================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key $ttl [$arg1, $arg2 ...] */ /* ==================================================== */ if (c->argc < 4) { goto bad_arguments_number; } /* 提取ttl */ long long ttl; if (getLongLongFromObject(c->argv[3], &ttl) != C_OK || ttl < 0) { slotsrestoreReplyAck(c, -1, "invalid TTL value (TTL=%s)", c->argv[3]->ptr); return C_ERR; } /* SLOTSRESTORE-ASYNC expire $key $ttl */ if (!strcasecmp(cmd, "expire")) { /* 參數校驗 */ if (c->argc != 4) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) == NULL) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 響應 */ slotsrestoreReplyAck(c, 0, "1"); /* 會執過時設置 */ goto success_common; } /* SLOTSRESTORE-ASYNC string $key $ttl $payload */ if (!strcasecmp(cmd, "string")) { /* 參數校驗 */ if (c->argc != 5) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) != NULL) { slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr); return C_ERR; } /* 對val編碼 */ robj *val = c->argv[4] = tryObjectEncoding(c->argv[4]); /* 添加到db */ dbAdd(c->db, key, val); /* 引用計數 */ incrRefCount(val); /* 響應 */ slotsrestoreReplyAck(c, 0, "1"); /* 會執過時設置 */ goto success_common; } /* SLOTSRESTORE-ASYNC object $key $ttl $payload */ if (!strcasecmp(cmd, "object")) { /* 參數校驗 */ if (c->argc != 5) { goto bad_arguments_number; } /* 查看key是否存在 */ if (lookupKeyWrite(c->db, key) != NULL) { slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr); return C_ERR; } void *bytes = c->argv[4]->ptr; rio payload; /* 校驗RDB序列化格式 */ if (verifyDumpPayload(bytes, sdslen(bytes)) != C_OK) { slotsrestoreReplyAck(c, -1, "invalid payload checksum"); return C_ERR; } /* 初始化payload */ rioInitWithBuffer(&payload, bytes); /* 獲取對象類型 */ int type = rdbLoadObjectType(&payload); if (type == -1) { slotsrestoreReplyAck(c, -1, "invalid payload type"); return C_ERR; } /* 獲取值對象 */ robj *val = rdbLoadObject(type, &payload); if (val == NULL) { slotsrestoreReplyAck(c, -1, "invalid payload body"); return C_ERR; } /* 添加到db */ dbAdd(c->db, key, val); /* 響應 */ slotsrestoreReplyAck(c, 0, "1"); /* 會執過時設置 */ goto success_common; } /* ========================================================== */ /* SLOTSRESTORE-ASYNC $cmd $key $ttl $hint [$arg1, $arg2 ...] */ /* ========================================================== */ /* 參數校驗 */ if (c->argc < 5) { goto bad_arguments_number; } /* 提取總長度hint */ long long hint; if (getLongLongFromObject(c->argv[4], &hint) != C_OK || hint < 0) { slotsrestoreReplyAck(c, -1, "invalid Hint value (Hint=%s)", c->argv[4]->ptr); return C_ERR; } int xargc = c->argc - 5; robj **xargv = &c->argv[5]; /* SLOTSRESTORE-ASYNC list $key $ttl $hint [$elem1 ...] */ if (!strcasecmp(cmd, "list")) { /* 查看key是否存在 */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* 若是key已經存在,則val類型必須爲OBJ_LIST切編碼類型必須爲OBJ_ENCODING_QUICKLIST */ if (val->type != OBJ_LIST || val->encoding != OBJ_ENCODING_QUICKLIST) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_LIST, OBJ_ENCODING_QUICKLIST, val->type, val->encoding); return C_ERR; } } else { /* 不然key不存在 */ if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 常見Quicklist對象 */ val = createQuicklistObject(); /* 設置選項 */ quicklistSetOptions(val->ptr, server.list_max_ziplist_size, server.list_compress_depth); /* 添加到db */ dbAdd(c->db, key, val); } /* 將全部的args添加到val Quicklist中 */ for (int i = 0; i < xargc; i ++) { xargv[i] = tryObjectEncoding(xargv[i]); listTypePush(val, xargv[i], LIST_TAIL); } /* 返回值爲val當前總長度 */ slotsrestoreReplyAck(c, 0, "%d", listTypeLength(val)); goto success_common; } /* SLOTSRESTORE-ASYNC hash $key $ttl $hint [$hkey1 $hval1 ...] */ if (!strcasecmp(cmd, "hash")) { /* 對於hash類型args必須是偶數 */ if (xargc % 2 != 0) { goto bad_arguments_number; } /* 先查找key */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* key已存在,則類型必須爲OBJ_HASH,編碼類型必須爲OBJ_ENCODING_HT */ if (val->type != OBJ_HASH || val->encoding != OBJ_ENCODING_HT) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_HASH, OBJ_ENCODING_HT, val->type, val->encoding); return C_ERR; } } else { if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不存在就建立hash對象 */ val = createHashObject(); if (val->encoding != OBJ_ENCODING_HT) { hashTypeConvert(val, OBJ_ENCODING_HT); } /* 添加到db */ dbAdd(c->db, key, val); } /* 若是總長度不爲0 */ if (hint != 0) { dict *ht = val->ptr; /* 使用hint建立或者擴展ht */ dictExpand(ht, hint); } /* 順序添加 */ for (int i = 0; i < xargc; i += 2) { /* field */ hashTypeTryObjectEncoding(val, &xargv[i], &xargv[i + 1]); /* value */ hashTypeSet(val, xargv[i], xargv[i + 1]); } /* 返回值爲val當前總長度 */ slotsrestoreReplyAck(c, 0, "%d", hashTypeLength(val)); goto success_common; } /* SLOTSRESTORE-ASYNC dict $key $ttl $hint [$elem1 ...] */ if (!strcasecmp(cmd, "dict")) { /* 先查找key */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* key已存在,則類型必須爲OBJ_SET,編碼類型必須爲OBJ_ENCODING_HT */ if (val->type != OBJ_SET || val->encoding != OBJ_ENCODING_HT) { slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_SET, OBJ_ENCODING_HT, val->type, val->encoding); return C_ERR; } } else { if (xargc == 0) { slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不存在就建立set對象 */ val = createSetObject(); if (val->encoding != OBJ_ENCODING_HT) { setTypeConvert(val, OBJ_ENCODING_HT); } /* 添加到db */ dbAdd(c->db, key, val); } /* 若是總長度不爲0 */ if (hint != 0) { dict *ht = val->ptr; /* 使用hint建立或者擴展ht */ dictExpand(ht, hint); } /* 順序添加 */ for (int i = 0; i < xargc; i ++) { /* feild */ xargv[i] = tryObjectEncoding(xargv[i]); /* val */ setTypeAdd(val, xargv[i]); } /* 返回值爲val當前總長度 */ slotsrestoreReplyAck(c, 0, "%d", setTypeSize(val)); goto success_common; } /* SLOTSRESTORE-ASYNC zset $key $ttl $hint [$elem1 $score1 ...] */ if (!strcasecmp(cmd, "zset")) { /* zset參數也必須是偶數,elem1和score配對 */ if (xargc % 2 != 0) { goto bad_arguments_number; } /* 提取score */ double *scores = zmalloc(sizeof(double) * xargc / 2); for (int i = 1, j = 0; i < xargc; i += 2, j ++) { uint64_t bits; if (getUint64FromRawStringObject(xargv[i], &bits) != C_OK) { zfree(scores); slotsrestoreReplyAck(c, -1, "invalid zset score ([%d]), bad raw bits", j); return C_ERR; } scores[j] = convertRawBitsToDouble(bits); } /* */ robj *val = lookupKeyWrite(c->db, key); if (val != NULL) { /* val已經存在,校驗類型 */ if (val->type != OBJ_ZSET || val->encoding != OBJ_ENCODING_SKIPLIST) { zfree(scores); slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)", OBJ_ZSET, OBJ_ENCODING_SKIPLIST, val->type, val->encoding); return C_ERR; } } else { /* 不存在 */ if (xargc == 0) { zfree(scores); slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr); return C_ERR; } /* 不然就建立zset對象 */ val = createZsetObject(); if (val->encoding != OBJ_ENCODING_SKIPLIST) { zsetConvert(val, OBJ_ENCODING_SKIPLIST); } /* 添加到db */ dbAdd(c->db, key, val); } zset *zset = val->ptr; /* 若是總長度不爲0 */ if (hint != 0) { dict *ht = zset->dict; /* 就建立或修正hash大小爲hint */ dictExpand(ht, hint); } /* 順序添加 */ for (int i = 0, j = 0; i < xargc; i += 2, j ++) { robj *elem = xargv[i] = tryObjectEncoding(xargv[i]); dictEntry *de = dictFind(zset->dict, elem); if (de != NULL) { /* score */ double score = *(double *)dictGetVal(de); zslDelete(zset->zsl, score, elem); /* memeber */ dictDelete(zset->dict, elem); } /* 添加elem */ zskiplistNode *znode = zslInsert(zset->zsl, scores[j], elem); /* 引用計數 */ incrRefCount(elem); /* 添加score */ dictAdd(zset->dict, elem, &(znode->score)); /* 引用計數 */ incrRefCount(elem); } zfree(scores); /* 返回值爲val當前總長度 */ slotsrestoreReplyAck(c, 0, "%d", zsetLength(val)); goto success_common; } slotsrestoreReplyAck(c, -1, "unknown command (argc=%d,cmd=%s)", c->argc, cmd); return C_ERR; bad_arguments_number: slotsrestoreReplyAck(c, -1, "wrong number of arguments (argc=%d,cmd=%s)", c->argc, cmd); return C_ERR; success_common: /* ttl不爲0就設置過時,不然就刪除過時設置 */ if (ttl != 0) { setExpire(c->db, key, mstime() + ttl); } else { removeExpire(c->db, key); } /* 通知watched */ signalModifiedKey(c->db, key); /* 髒計數 */ server.dirty ++; return C_OK; } /* * * SLOTSRESTORE-ASYNC delete $key * expire $key $ttl * object $key $ttl $payload * string $key $ttl $payload * list $key $ttl $hint [$elem1 ...] * hash $key $ttl $hint [$hkey1 $hval1 ...] * dict $key $ttl $hint [$elem1 ...] * zset $key $ttl $hint [$elem1 $score1 ...] * */ void slotsrestoreAsyncCommand(client *c) { /* slotsrestore-async命令處理 */ if (slotsrestoreAsyncHandle(c) != C_OK) { c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } /* 目的實例發送SLOTSRESTORE-ASYNC-ACK的處理 */ static int slotsrestoreAsyncAckHandle(client *c) { /* 獲取該db上對應的slotsmgrtAsyncClient */ slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id); if (ac->c != c) { /* 必須是同一個客戶端發送的SLOTSRESTORE-ASYNC-ACK才合法 */ addReplyErrorFormat(c, "invalid client, permission denied"); return C_ERR; } /* 參數校驗,格式:SLOTSRESTORE-ASYNC-ACK $errno $message */ if (c->argc != 3) { addReplyError(c, "wrong number of arguments for SLOTSRESTORE-ASYNC-ACK"); return C_ERR; } long long errcode; if (getLongLongFromObject(c->argv[1], &errcode) != C_OK) { addReplyErrorFormat(c, "invalid errcode (%s)", (char *)c->argv[1]->ptr); return C_ERR; } /* 若是有錯誤這個就是錯誤的描述信息 */ const char *errmsg = c->argv[2]->ptr; if (errcode != 0) { /* 錯誤碼不爲0則打印錯誤 */ serverLog(LL_WARNING, "slotsmgrt_async: ack[%d] %s", (int)errcode, errmsg != NULL ? errmsg : "(null)"); return C_ERR; } /* batched_iter校驗,理論上在有遷移狀態下不能爲NULL */ if (ac->batched_iter == NULL) { serverLog(LL_WARNING, "slotsmgrt_async: null batched iterator"); addReplyError(c, "invalid iterator (NULL)"); return C_ERR; } /* 正在發送的消息個數(飛行中的消息) */ if (ac->sending_msgs == 0) { serverLog(LL_WARNING, "slotsmgrt_async: invalid message counter"); addReplyError(c, "invalid pending messages"); return C_ERR; } /* 更新slotsmgrtAsyncClient最後一次被使用的時間 */ ac->lastuse = mstime(); /* 飛行中的消息個數減一(即一條restore命令收到了一個ack) */ ac->sending_msgs -= 1; /* 繼續產生新的restore命令(在給定10ms內至少產生2條消息) */ ac->sending_msgs += slotsmgrtAsyncNextMessagesMicroseconds(ac, 2, 10); /* 若是還有正在發送的消息(即發出去還沒收到ACK) */ if (ac->sending_msgs != 0) { return C_OK; } /* 通知客戶端 */ notifySlotsmgrtAsyncClient(ac, NULL); /* 獲取批處理迭代器 */ batchedObjectIterator *it = ac->batched_iter; if (listLength(it->removed_keys) != 0) { /* 若是被移走的key個數不爲0 */ list *ll = it->removed_keys; for (int i = 0; i < c->argc; i ++) { /* 遍歷removed_keys鏈表,對其引用計數減一 */ decrRefCount(c->argv[i]); } /* 釋放客戶端當前的參數結構 */ zfree(c->argv); /* DEL key1 key2 key2 ... */ c->argc = 1 + listLength(ll); /* 分配argv結構 */ c->argv = zmalloc(sizeof(robj *) * c->argc); for (int i = 1; i < c->argc; i ++) { /* 遍歷、填充argv */ listNode *head = listFirst(ll); /* 獲取被移走的key */ robj *key = listNodeValue(head); /* 將其從db中刪除 */ if (dbDelete(c->db, key)) { /* 通知key空間 */ signalModifiedKey(c->db, key); /* 髒計數 */ server.dirty ++; } /* 填充argv */ c->argv[i] = key; /* 引用計數 */ incrRefCount(key); /* 刪除當前節點 */ listDelNode(ll, head); } /* 填充 argv[0] */ c->argv[0] = createStringObject("DEL", 3); /* 注意,雖然客戶端發來的是SLOTSRESTORE-ASYNC-ACK命令, 可是此時咱們已經將其改寫爲一條DEL命令,該函數退出後,會被 propagate寫到AOF文件和全部slaves */ } /* 用於存放使用chunked方式發生的val */ if (listLength(it->chunked_vals) != 0) { list *ll = it->chunked_vals; /* 遍歷 chunked_vals鏈表 */ while (listLength(ll) != 0) { /* 頭結點 */ listNode *head = listFirst(ll); /* 提取節點值 */ robj *o = listNodeValue(head); /* 引用計數 */ incrRefCount(o); /* 刪除當前節點 */ listDelNode(ll, head); /* 若是當前對象refcount不爲1就先decrRefCount */ if (o->refcount != 1) { decrRefCount(o); } else { /* 不然refcount爲1就lazy釋放 */ lazyReleaseObject(o); } } } ac->batched_iter = NULL; freeBatchedObjectIterator(it); return C_OK; } /* * * SLOTSRESTORE-ASYNC-ACK $errno $message * */ void slotsrestoreAsyncAckCommand(client *c) { /* 調用slotsrestoreAsyncAckHandle進一步處理 */ if (slotsrestoreAsyncAckHandle(c) != C_OK) { /* Close after writing entire reply. */ c->flags |= CLIENT_CLOSE_AFTER_REPLY; } } extern int time_independent_strcmp(const char *a, const char *b); /* * * SLOTSRESTORE-ASYNC-AUTH $passwd * */ void slotsrestoreAsyncAuthCommand(client *c) { if (!server.requirepass) { /* 若是服務端沒有設置密碼則返回錯誤 */ slotsrestoreReplyAck(c, -1, "Client sent AUTH, but no password is set"); return; } if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { /* 密碼匹配成功則設置客戶端的authenticated標誌,並響應ok */ c->authenticated = 1; slotsrestoreReplyAck(c, 0, "OK"); } else { /* 密碼匹配失敗 */ c->authenticated = 0; slotsrestoreReplyAck(c, -1, "invalid password"); } } /* * * SLOTSRESTORE-ASYNC-SELECT $db * */ void slotsrestoreAsyncSelectCommand(client *c) { long long db; if (getLongLongFromObject(c->argv[1], &db) != C_OK || !(db >= 0 && db <= INT_MAX) || selectDb(c, db) != C_OK) { slotsrestoreReplyAck(c, -1, "invalid DB index (%s)", c->argv[1]->ptr); } else { slotsrestoreReplyAck(c, 0, "OK"); } }