Redis5.0版是Redis產品的重大版本發佈,它的最新特色:
1 新的流數據類型(Stream data type) https://redis.io/topics/streams-intro 2 新的 Redis 模塊 API:定時器、集羣和字典 API(Timers, Cluster and Dictionary APIs) 3 RDB 增長 LFU 和 LRU 信息 4 集羣管理器從 Ruby (redis-trib.rb) 移植到了redis-cli 中的 C 語言代碼 5 新的有序集合(sorted set)命令:ZPOPMIN/MAX 和阻塞變體(blocking variants) 6 升級 Active defragmentation 至 v2 版本 7 加強 HyperLogLog 的實現 8 更好的內存統計報告 9 許多包含子命令的命令如今都有一個 HELP 子命令 10 客戶端頻繁鏈接和斷開鏈接時,性能表現更好 11 許多錯誤修復和其餘方面的改進 12 升級 Jemalloc 至 5.1 版本 13 引入 CLIENT UNBLOCK 和 CLIENT ID 14 新增 LOLWUT 命令 http://antirez.com/news/123 15 在不存在須要保持向後兼容性的地方,棄用 "slave" 術語 16 網絡層中的差別優化 17 Lua 相關的改進 18 引入動態的 HZ(Dynamic HZ) 以平衡空閒 CPU 使用率和響應性 19 對 Redis 核心代碼進行了重構並在許多方面進行了改進
Redis Cluster總覽
1、簡介
在官方文檔Cluster Spec中,做者詳細介紹了Redis集羣爲何要設計成如今的樣子。最核心的目標有三個:
1 性能:增長集羣功能後不能對性能產生太大影響,因此Redis採起了P2P而非Proxy方式、異步複製、客戶端重定向等設計。 2 水平擴展:文檔中稱能夠線性擴展到1000結點。 3 可用性:在Cluster推出以前,可用性要靠Sentinel保證。有了集羣以後也自動具備了Sentinel的監控和自動Failover能力。
若是須要全面的瞭解,那必定要看官方文檔Cluster Tutorial。
Redis Cluster是一個高性能高可用的分佈式系統。由多個Redis實例組成的總體,數據按照Slot存儲分佈在多個Redis實例上,經過Gossip協議來進行節點之間通訊。功能特色以下:
1 全部的節點相互鏈接 2 集羣消息通訊經過集羣總線通訊,集羣總線端口大小爲客戶端服務端口+10000(固定值) 3 節點與節點之間經過二進制協議進行通訊 4 客戶端和集羣節點之間通訊和一般同樣,經過文本協議進行 5 集羣節點不會代理查詢 6 數據按照Slot存儲分佈在多個Redis實例上 7 集羣節點掛掉會自動故障轉移 8 能夠相對平滑擴/縮容節點
關於Cluster相關的源碼能夠見:src/cluster.c 和 src/cluster.h
2、通訊
2.1 CLUSTER MEET
須要組建一個真正的可工做的集羣,咱們必須將各個獨立的節點鏈接起來,構成一個包含多個節點的集羣。鏈接各個節點的工做使用CLUSTER MEET命令來完成。
CLUSTER MEET <ip> <port>
CLUSTER MEET命令實現:
1 節點A會爲節點B建立一個 clusterNode 結構,並將該結構添加到本身的 clusterState.nodes 字典裏面。 2 節點A根據CLUSTER MEET命令給定的IP地址和端口號,向節點B發送一條MEET消息。 3 節點B接收到節點A發送的MEET消息,節點B會爲節點A建立一個clusterNode結構,並將該結構添加到本身的clusterState.nodes字典裏面。 4 節點B向節點A返回一條PONG消息。 5 節點A將受到節點B返回的PONG消息,經過這條PONG消息節點A能夠知道節點B已經成功的接收了本身發送的MEET消息。 6 節點A將向節點B返回一條PING消息。 7 節點B將接收到的節點A返回的PING消息,經過這條PING消息節點B能夠知道節點A已經成功的接收到了本身返回的PONG消息,握手完成。 8 節點A會將節點B的信息經過Gossip協議傳播給集羣中的其餘節點,讓其餘節點也與節點B進行握手,最終,通過一段時間後,節點B會被集羣中的全部節點認識。
2.2 消息處理 clusterProcessPacket
1 更新接收消息計數器 2 查找發送者節點而且不是handshake節點 3 更新本身的epoch和slave的offset信息 4 處理MEET消息,使加入集羣 5 從goosip中發現未知節點,發起handshake 6 對PING,MEET回覆PONG 7 根據收到的心跳信息更新本身clusterState中的master-slave,slots信息 8 對FAILOVER_AUTH_REQUEST消息,檢查並投票 9 處理FAIL,FAILOVER_AUTH_ACK,UPDATE信息
2.3 定時任務clusterCron
1 對handshake節點創建Link,發送Ping或Meet 2 向隨機節點發送Ping 3 若是是從查看是否須要作Failover 4 統計並決定是否進行slave的遷移,來平衡不一樣master的slave數 5 判斷全部pfail報告數是否過半數
2.4 心跳數據
集羣中的節點會不停(每幾秒)的互相交換ping、pong包,ping和pong包具備相同的結構,只是類型不一樣,ping、pong包合在一塊兒叫作心跳包。一般節點會發送ping包並接收接收者返回的pong包,不過這也不是絕對,節點也有可能只發送pong包,而不須要讓接收者發送返回包。
節點間經過ping保持心跳以及進行gossip集羣狀態同步,每次心跳時,節點會帶上多個clusterMsgDataGossip消息體,通過屢次心跳,該節點包含的其餘節點信息將同步到其餘節點。
ping和pong包的內容能夠分爲header和gossip消息兩部分:
- 發送消息頭信息Header
- 所負責slots的信息
- 主從信息
- ip, port信息
- 狀態信息
包含的信息:
1 NODE ID是一個160bit的僞隨機字符串,它是節點在集羣中的惟一標識 2 currentEpoch和configEpoch字段 3 node flag,標識節點是master仍是slave,另外還有一些其餘的標識位,如PFAIL和FAIL。 4 節點提供服務的hash slot的bitmap 5 發送者的TCP端口 6 發送者認爲的集羣狀態(down or ok) 7 若是是slave,則包含master的NODE ID
- 發送其餘節點Gossip信息。包含了該節點認爲的其餘節點的狀態,不過不是集羣的所有節點(隨機)
- ping_sent, pong_received
- ip, port信息
- 狀態信息,好比發送者認爲該節點已經不可達,會在狀態信息中標記其爲PFAIL或FAIL
包含的信息:
1 NODE ID 2 節點的IP和端口 3 NODE flags
clusterMsg結構的currentEpoch、sender、myslots等屬性記錄了發送者自身的節點信息,接收者會根據這些信息,在本身的clusterState.nodes字典裏找到發送者對應的結構,並對結構進行更新。
Redis集羣中的各個節點經過ping來心跳,經過Gossip協議來交換各自關於不一樣節點的狀態信息,其中Gossip協議由MEET、PING、PONG三種消息實現,這三種消息的正文都由兩個clusterMsgDataGossip結構組成。
每次發送MEET、PING、PONG消息時,發送者都從本身的已知節點列表中隨機選出兩個節點(能夠是主節點或者從節點),並將這兩個被選中節點的信息分別保存到兩個結構中。當接收者收到消息時,接收者會訪問消息正文中的兩個結構,並根據本身是否定識clusterMsgDataGossip結構中記錄的被選中節點進行操做:
1 若是被選中節點不存在於接收者的已知節點列表,那麼說明接收者是第一次接觸到被選中節點,接收者將根據結構中記錄的IP地址和端口號等信息,與被選擇節點進行握手。 2 若是被選中節點已經存在於接收者的已知節點列表,那麼說明接收者以前已經與被選中節點進行過接觸,接收者將根據clusterMsgDataGossip結構記錄的信息,對被選中節點對應的clusterNode結構進行更新。
2.5 數據結構
clusterNode 結構保存了一個節點的當前信息, 如記錄了節點負責處理那些槽、建立時間、節點的名字、節點當前的配置紀元、節點的 IP 和端口等:
1 slots:位圖,由當前clusterNode負責的slot爲1 2 salve, slaveof:主從關係信息 3 ping_sent, pong_received:心跳包收發時間 4 clusterLink *link:節點間的鏈接 5 list *fail_reports:收到的節點不可達投票
clusterState 結構記錄了在當前節點的集羣目前所處的狀態還有全部槽的指派信息:
1 myself:指針指向本身的clusterNode 2 currentEpoch:當前節點的最大epoch,可能在心跳包的處理中更新 3 nodes:當前節點記錄的全部節點的字典,爲clusterNode指針數組 4 slots:slot與clusterNode指針映射關係 5 migrating_slots_to,importing_slots_from:記錄slots的遷移信息 6 failover_auth_time,failover_auth_count,failover_auth_sent,failover_auth_rank,failover_auth_epoch:Failover相關信息
clusterLink 結構保存了鏈接節點的有關信息, 好比套接字描述符, 輸入緩衝區和輸出緩衝區。
3、數據分佈及槽信息
3.1 槽(slot)概念
Redis Cluster中有一個16384長度的槽的概念,他們的編號爲0、一、二、3……1638二、16383。這個槽是一個虛擬的槽,並非真正存在的。正常工做的時候,Redis Cluster中的每一個Master節點都會負責一部分的槽,當有某個key被映射到某個Master負責的槽,那麼這個Master負責爲這個key提供服務,至於哪一個Master節點負責哪一個槽,這是能夠由用戶指定的,也能夠在初始化的時候自動生成。在Redis Cluster中,只有Master才擁有槽的全部權,若是是某個Master的slave,這個slave只負責槽的使用,可是沒有全部權。
3.2 數據分片
在Redis Cluster中,擁有16384個slot,這個數是固定的,存儲在Redis Cluster中的全部的鍵都會被映射到這些slot中。數據庫中的每一個鍵都屬於這 16384 個哈希槽的其中一個,集羣使用公式 CRC16(key) % 16384 來計算鍵 key 屬於哪一個槽,其中 CRC16(key) 用於計算鍵 key 的 CRC16 校驗和,集羣中的每一個節點負責處理一部分哈希槽。
3.3 節點的槽指派信息
clusterNode結構的slots屬性和numslot屬性記錄了節點負責處理那些槽:
struct clusterNode { //… unsignedchar slots[16384/8]; };
Slots屬性是一個二進制位數組(bitarray),這個數組的長度爲16384/8=2048個字節,共包含16384個二進制位。Master節點用bit來標識對於某個槽本身是否擁有。好比對於編號爲1的槽,Master只要判斷序列的第二位(索引從0開始)是否是爲1便可。時間複雜度爲O(1)。
3.4 集羣全部槽的指派信息
經過將全部槽的指派信息保存在clusterState.slots數組裏面,程序要檢查槽i是否已經被指派,又或者取得負責處理槽i的節點,只須要訪問clusterState.slots[i]的值便可,複雜度僅爲O(1)。
3.5 請求重定向
因爲每一個節點只負責部分slot,以及slot可能從一個節點遷移到另外一節點,形成客戶端有可能會向錯誤的節點發起請求。所以須要有一種機制來對其進行發現和修正,這就是請求重定向。有兩種不一樣的重定向場景:
a) MOVED錯誤
-
請求的key對應的槽不在該節點上,節點將查看自身內部所保存的哈希槽到節點 ID 的映射記錄,節點回復一個 MOVED 錯誤。
-
須要客戶端進行再次重試。
b) ASK錯誤
-
請求的key對應的槽目前的狀態屬於MIGRATING狀態,而且當前節點找不到這個key了,節點回復ASK錯誤。ASK會把對應槽的IMPORTING節點返回給你,告訴你去IMPORTING的節點查找。
-
客戶端進行重試 首先發送ASKING命令,節點將爲客戶端設置一個一次性的標誌(flag),使得客戶端能夠執行一次針對 IMPORTING 狀態的槽的命令請求,而後再發送真正的命令請求。
-
沒必要更新客戶端所記錄的槽至節點的映射。
4、數據遷移
當槽x從Node A向Node B遷移時,Node A和Node B都會有這個槽x,Node A上槽x的狀態設置爲MIGRATING,Node B上槽x的狀態被設置爲IMPORTING。
MIGRATING狀態
-
若是key存在則成功處理
-
若是key不存在,則返回客戶端ASK,客戶端根據ASK首先發送ASKING命令到目標節點,而後發送請求的命令到目標節點
-
當key包含多個:
-
若是都存在則成功處理
-
若是都不存在,則返回客戶端ASK
-
若是一部分存在,則返回客戶端TRYAGAIN,通知客戶端稍後重試,這樣當全部的key都遷移完畢的時候客戶端重試請求的時候回獲得ASK,而後通過一次重定向就能夠獲取這批鍵
-
此時不刷新客戶端中node的映射關係
IMPORTING狀態
-
若是key不在該節點上,會被MOVED重定向,刷新客戶端中node的映射關係
-
若是是ASKING則命令會被執行,key不在遷移的節點已經被遷移到目標的節點
-
Key不存在則新建
Key遷移的命令:
1 DUMP:在源(migrate)上執行 2 RESTORE:在目標(importing)上執行 3 DEL:在源(migrate)上執行
通過上面三步能夠將鍵遷移,而後再將處於MIGRATING和IMPORTING狀態的槽變爲常態,完成整個從新分片的過程,具體的信息可見Redis Cluster部署、管理和測試 。
4.1 讀寫請求
槽裏面的key還未遷移,而且槽屬於遷移中。
假如槽x在Node A,須要遷移到Node B上,槽x的狀態爲migrating,其中的key1還沒輪到遷移。此時訪問key1則先計算key1所在的Slot,存在key1則直接返回。
4.2 MOVED請求
槽裏面的key已經遷移過去,而且槽屬於遷移完。
假如槽x在Node A,須要遷移到Node B上,遷移完成。此時訪問key1則先計算key1所在的Slot,由於已經遷移至Node B上,Node A上不存在,則返回 moved slotid IP:PORT,再根據返回的信息去Node B訪問key1。
4.3 ASK請求
槽裏面的key已經遷移完,而且槽屬於遷移中的狀態。
假如槽x在Node A,須要遷移到Node B上,遷移完成,但槽x的狀態爲migrating。此時訪問key1則先計算key1所在的Slot,不存在key1則返回ask slotid IP:PORT,再根據ask返回的信息發送asking請求到Node B,沒問題後則最後再去Node B上訪問key1。
5、通訊故障
5.1 故障檢測
集羣中的每一個節點都會按期地向集羣中的其餘節點發送PING消息,以此交換各個節點狀態信息,檢測各個節點狀態:在線狀態、疑似下線狀態PFAIL、已下線狀態FAIL。
當主節點A經過消息得知主節點B認爲主節點D進入了疑似下線(PFAIL)狀態時,主節點A會在本身的clusterState.nodes字典中找到主節點D所對應的clusterNode結構,並將主節點B的下線報告(failure report)添加到clusterNode結構的fail_reports鏈表中。
struct clusterNode { //... //記錄全部其餘節點對該節點的下線報告 list*fail_reports; //... };
若是集羣裏面,半數以上的主節點都將主節點D報告爲疑似下線,那麼主節點D將被標記爲已下線(FAIL)狀態,將主節點D標記爲已下線的節點會向集羣廣播主節點D的FAIL消息,全部收到FAIL消息的節點都會當即更新nodes裏面主節點D狀態標記爲已下線。
將 node 標記爲 FAIL 須要知足如下兩個條件:
1 有半數以上的主節點將 node 標記爲 PFAIL 狀態。 2 當前節點也將 node 標記爲 PFAIL 狀態。
5.2 多個從節點選主
選新主的過程基於Raft協議選舉方式來實現的:
1 當從節點發現本身的主節點進行已下線狀態時,從節點會廣播一條CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息,要求全部收到這條消息,而且具備投票權的主節點向這個從節點投票 2 若是一個主節點具備投票權,而且這個主節點還沒有投票給其餘從節點,那麼主節點將向要求投票的從節點返回一條,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,表示這個主節點支持從節點成爲新的主節點 3 每一個參與選舉的從節點都會接收CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,並根據本身收到了多少條這種消息來統計本身得到了多少主節點的支持 4 若是集羣裏有N個具備投票權的主節點,那麼當一個從節點收集到大於等於集羣N/2+1張支持票時,這個從節點就成爲新的主節點 5 若是在一個配置紀元沒有從可以收集到足夠的支持票數,那麼集羣進入一個新的配置紀元,並再次進行選主,直到選出新的主節點爲止
5.3 故障轉移
錯誤檢測用於識別集羣中的不可達節點是否已下線,若是一個master下線,會將它的slave提高爲master,在gossip消息中,NODE flags的值包括兩種PFAIL和FAIL。
PFAIL flag:
若是一個節點發現另一個節點不可達的時間超過NODE_TIMEOUT ,則會將這個節點標記爲PFAIL,即Possible failure(可能下線)。節點不可達是說一個節點發送了ping包,可是等待了超過NODE_TIMEOUT時間仍然沒有收到迴應(NODE_TIMEOUT必須大於一個網絡包來回的時間)。
FAIL flag:
PFAIL標誌只是一個節點本地的信息,爲了使slave提高爲master,須要將PFAIL升級爲FAIL。PFAIL升級爲FAIL須要知足一些條件:
1 A節點將B節點標記爲PFAIL 2 A節點經過gossip消息收集其餘大部分master節點標識的B節點的狀態 3 大部分master節點在NODE_TIMEOUT * FAIL_REPORT_VALIDITY_MULT(2s)時間段內,標識B節點爲PFAIL或FAIL
若是知足以上條件,A節點會將B節點標識爲FAIL而且向全部節點發送B節點FAIL的消息。收到消息的節點也都會將B標爲FAIL。
注意:FAIL狀態是單向的,只能從PFAIL升級爲FAIL,而不能從FAIL降爲PFAIL。
清除FAIL狀態:
- 節點從新可達,而且是slave節點
- 節點從新可達,而且是master節點,可是不提供任何slot服務
- 節點從新可達,而且是master節點,可是長時間沒有slave被提高爲master來頂替它
PFAIL提高到FAIL使用的是一種弱協議:
- 節點收集的狀態不在同一時間點,會丟棄時間較早的報告信息,可是也只能保證節點的狀態在一段時間內大部分master達成了一致
- 檢測到一個FAIL後,須要通知全部節點,可是沒有辦法保證每一個節點都能成功收到消息
當從節點發現本身的主節點變爲已下線(FAIL)狀態時,便嘗試進Failover,成爲新的主。如下是故障轉移的執行步驟:
1 在下線主節點的全部從節點中選中一個從節點 2 被選中的從節點執行SLAVEOF NO NOE命令,成爲新的主節點 3 新的主節點會撤銷全部對已下線主節點的槽指派,並將這些槽所有指派給本身 4 新的主節點對集羣進行廣播PONG消息,告知其餘節點已經成爲新的主節點 5 新的主節點開始接收和處理槽相關的請求
相關結構
clusterNode:

typedef struct clusterNode { mstime_t ctime; // 該node建立時間 char name[CLUSTER_NAMELEN]; // 40位的node名字 // node 狀態標識經過投CLUSTER_NODE 定義。 // 包括 maser slave self pfail fail handshake noaddr meet migrate_to null_name 這些狀態 int flags; // 本節點最新epoch uint64_t configEpoch; // 當前node負責的slot 經過bit表示 unsigned char slots[CLUSTER_SLOTS/8]; int numslots; int numslaves; struct clusterNode **slaves; // 若是該node爲從 則指向master節點 struct clusterNode *slaveof; mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned master condition */ long long repl_offset; /* Last known repl offset for this node. */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ clusterLink *link; // 將該節點標記爲失敗的node list // 節點收到gossip消息後,若是gossip裏標記該節點爲pfail則加入改list // 好比:節點a向b發送gossip,消息包含了 c 節點且出於pfail,則a將被加入c的link。 list *fail_reports; } clusterNode;
clusterMsgData:節點間通信的數據結構 包含了 ping、fail、publish、update四種類型

struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; } ping;
節點間經過ping保持心跳以及進行gossip集羣狀態同步,每次心跳時,節點會帶上多個clusterMsgDataGossip(其餘節點)消息體,通過屢次心跳,該節點包含的其餘節點信息將同步到其餘節點。
clusterState:定義了完整的集羣信息

struct clusterState{ // 集羣最新的epoch,爲64位的自增序列 uint64_t currentEpoch; // 包含的全部節點信息 dict *nodes; // 每一個slot所屬於的節點,包括處於migrating和importinng狀態的slot clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; // 當前節點所包含的key 用於在getkeysinslot的時候返回key信息 zskiplist *slots_to_keys; ... }
redis啓動,判斷是否容許cluster模式,若是容許,則調用clusterInit進行cluster信息的初始化。clusterState被初始化爲初始值。在後續節點meet及ping過程逐步更新clusterState信息。
Send Ping:
節點建立成功後,節點會向已知的其餘節點發送ping消息保持心跳,ping消息體同時會攜帶已知節點的信息,並經過gossip同步到集羣的其餘節點。
node的ping由clusterCron負責調用,服務啓動時,在serverCron內部會註冊clusterCron,該函數每秒執行10次,在clusterCron內部,維護着static變量iteration記錄該函數被執行的次數:經過if (!(iteration % 10)){}的判斷,使得各節點每秒發送一次心跳。ping節點選擇的代碼邏輯以下:
clusterCron:

void clusterCron(void) { // ... // 若是沒有設置handshake超時,則默認超時未1s handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000; // 遍歷nodes列表 while ((de = dictNext(di)) != NULL) { // 刪除handshake超時的節點 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { clusterDelNode(node); continue; } // 若是該節點的link爲空,則爲該節點新建鏈接,而且初始化ping初始時間 if (node->link == NULL) { // 若是該節點處於meet狀態,則直接發送meet讓節點加入集羣 // 不然發送向該節點發送ping clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); } // 函數每被調動10次,則發送一次ping,所以ping間隔爲1s if (!(iteration % 10)) { int j; for (j = 0; j < 5; j++) { // 隨機選取節點並過濾link爲空的以及self de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); if (this->link == NULL || this->ping_sent != 0) continue; if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; // 挑選距離上次pong間隔最久的節點 // redis會盡可能選擇距離上次ping間隔最久的節點, // 以此防止隨機不均勻致使某些節點一直收不到ping if (min_pong_node == NULL || min_pong > this->pong_received) { min_pong_node = this; min_pong = this->pong_received; } } } }
發送ping的時候,會優先給新加入的節點發送ping,其實再選擇最久沒被更新的節點,經過對舊節點選擇的加權,儘量地保證了集羣最新狀態的一致。
每次ping請求,node會從已知的nodes列表裏隨機選取n個節點(n=1/10*len(nodes)&& n>=3),一段時間後,該節點已知的nodes將被同步到集羣的其餘節點,集羣狀態信息達成最終一致。具體實現代碼以下(只列出部分代碼,完整代碼見cluster.c/clusterSendPing)
clusterSendPing:

void clusterSendPing(clusterLink *link, int type) { // 選取1/10 的節點數而且要求大於3. // 1/10是個魔數,爲啥是1/10在源碼裏有解釋 int freshnodes = dictSize(server.cluster->nodes) - 2; wanted = floor(dictSize(server.cluster->nodes) / 10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; while (freshnodes > 0 && gossipcount < wanted && maxiterations--) { // 經過隨機函數隨機選擇一個節點,保證全部節點儘量被同步到整個集羣 dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); // 爲了保證失敗的節點儘量快地同步到集羣其餘節點, // 優先選取處於pfail以及fail狀態的節點 if (maxiterations > wanted * 2 && !(this->flags & (CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL))) continue; } // 若是被選中的節點處於 // 1.handshake 而且noaddr狀態 // 2.其餘節點沒有包含該節點的信息,而且該節點沒有擁有slot // 則跳過該節點而且將可用的節點數減1,以較少gossip數據同步的開銷 if (this->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_NOADDR) || (this->link == NULL && this->numslots == 0)) { freshnodes--; /* Tecnically not correct, but saves CPU. */ continue; } }
經過隨機選取合適數量的節點,以及對節點狀態的過濾,保證了儘量快的達成最終一致性的同時,減小gossip的網絡開銷。
cluster監聽cluster端口,並經過clusterAcceptHandler接受集羣節點發起的鏈接請求,經過aeCreateFileEvent將clusterReadHandler註冊進事件回調,讀取node發送的數據包。clusterReadHandler讀取到完整的數據包後,調用clusterProcessPacket處理包請求。clusterProcessPacket包含收到數據包後完整的處理邏輯。
clusterProcessPacket:

int clusterProcessPacket(clusterLink *link) { // 判斷是否爲ping請求並校驗數據包長度 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { uint16_t count = ntohs(hdr->count); uint32_t explen; /* expected length of this packet */ explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); explen += (sizeof(clusterMsgDataGossip) * count); if (totlen != explen) return 1; } // ... // 是否爲已知節點 sender = clusterLookupNode(hdr->sender); if (sender && !nodeInHandshake(sender)) { // 比較epoch並更新爲最大的epoch if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; /* Update the sender configEpoch if it is publishing a newer one. */ if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); } } // 回覆pong數據包 clusterSendPing(link, CLUSTERMSG_TYPE_PONG); // 獲取gossip消息並處理gossip請求 if (sender) clusterProcessGossipSection(hdr, link); }
clusterProcessGossipSection 讀取攜帶的gossip node內容,並判斷這些node是否failover:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { // ... if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) { if (clusterNodeAddFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s reported node %.40s as not reachable.", sender->name, node->name); } markNodeAsFailingIfNeeded(node); } else { // 若是該node並不是出於fail狀態,則從fail link裏刪除該node if (clusterNodeDelFailureReport(node, sender)) { serverLog(LL_VERBOSE, "Node %.40s reported node %.40s is back online.", sender->name, node->name); } } }

int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) { while ((ln = listNext(&li)) != NULL) { fr = ln->value; if (fr->node == sender) break; } if (!ln) return 0; // 若是以前被標記爲失敗,則從失敗list裏刪除 listDelNode(l, ln); }
cluster會根據收到的gossip包裏的msgdata來更新集羣的狀態信息,包括epoch,以及其他節點的狀態。若是node被標記爲pfail或fail,則被加入fail_reports,當fail_reports長度超過半數節點數量時,該節點及被標記爲failover。
總結:
Redis Cluster是去中心化的結構,集羣元數據信息分佈在每一個節點上,主備切換依賴於多個節點協商選主。採用無中心節點方式實現,無需proxy代理,客戶端直接與redis集羣的每一個節點鏈接,根據一樣的hash算法計算出key對應的slot,而後直接在slot對應的Redis上執行命令。從CAP定理來看,Cluster支持了AP(Availability&Partition-Tolerancy),這樣讓Redis從一個單純的NoSQL內存數據庫變成了分佈式NoSQL數據庫。