上次針對 redis 的源碼閱讀涉及普通的 client,此次針對 cluster 模式下的 client 源碼進行分析。
具體的源碼路徑就是在 lib/cluster 目錄下了。
開門見山,咱們使用 Cluster
模式最開始也是要進行實例化的,這裏調用的代碼位於 lib/cluster/index.ts
:javascript
const { Cluster } = require('ioredis') const cluster = new Redis.Cluster([ { port: 6380, host: "127.0.0.1", }, { port: 6381, host: "127.0.0.1", }, ]) cluster.get('someKey').then()
從源碼上來看,Cluster
預期接收兩個參數,第一個是啓動的節點集合 startNodes
,第二個是一個可選的 options
,
第一個參數是比較固定的,沒有太多含義,而第二個參數能夠傳遞不少配置,這個能夠在 README 中找到(目前是 12 個參數): https://www.npmjs.com/package/ioredis#cluster
若是沒有傳入的話,則會有默認值來填充,但並非全部的參數都會有默認值。 java
與 Redis Client
同樣的處理邏輯,在構造函數中會 call
一下 Commander
,隨後還會實例化一個 ConnectionPool
對象,並將 options.redisOptions
傳入進去。node
代碼位於 lib/cluster/ConnectionPool.ts
在實例化 ConnectionPool
的過程當中並無作什麼實質性的操做,只是把 options.redisOptions
放到了一個 private 屬性中。
隨後在 Cluster
構造函數中註冊了四個對應的事件,分別是 -node
、+node
、drain
和 nodeError
,分別表明了 移除節點、增長節點、節點爲空、節點出錯。
是 ConnectionPool
本身實現的一些事件,後續會看到在哪裏觸發。 git
接下來實例化了一個 ClusterSubscriber
對象,並將上邊實例化的 connectionPool
實例做爲參數放了進去,並把 Cluster
實例的引用也傳了進去。
實例化過程當中作的事情也比較簡單,就是監聽了上邊咱們提到的 -node
與 +node
兩個事件,在移除節點時,會判斷是否存在 subscriber
屬性,若是不存在則跳出,若是存在的話,會判斷被移除的 key 是否等於當前 subscriber
。
這裏能夠提一下 subscriber
到底是什麼,這個在下邊的 selectSubscriber
函數中能夠看到,就是實例化了一個 Redis Client
,而實例化 Redis Client
所使用的參數,都是經過調用 connectionPool
的 getNodes
方法所拿到的,並隨機挑選其中一個節點配置來進行實例化。
以後就會經過該 Redis Client
來調用兩個命令,subscriber
與 psubscriber
,這兩個是用來實現 Pub/Sub 的,具體區別其實就是後者能夠監聽一個帶有通配符的服務。github
subscriber 與 psubscriber 的區別: https://redis.io/topics/pubsub
並在接收到數據之後,經過emit
的方式轉發給Cluster
,後續 Cluster 模式下的 Pub/Sub 將會經過這裏來進行數據的傳遞。
最後咱們調用 connect
函數就完成了整個 Cluster
實例化的過程。redis
若是是開啓了 lazyConnect 那麼這裏就直接修改實例狀態爲 wait,而後結束整個流程。
在 connect
時咱們首先會解析 startNodes,拿到對應的 IP 和 端口等信息,而後會調用 reset 重置 connectionPool
中的實例,connectionPool
中會存儲多個 IP+端口 的 Redis 實例引用,調用 reset 會把一些應該不存在的實例給關掉,而後把一些新增長的進行建立,複用已經存在的實例,同時在新增節點的時候會觸發 ClusterSubscriber
的 +node
事件,若是此時是第一次觸發,那麼這時 ClusterSubscriber
纔會真正的去建立用於 Pub/Sub 的 Redis 實例。 數據庫
以後會註冊一個 refresh
事件,在事件內部會調用 readyCheck
,在這以前,則須要先去獲取 Redis
節點的一些信息,這裏是經過 getInfoFromNode
方法來實現的,內部會拿到一個 Redis
實例,並調用 duplicate
建立一個額外的實例,而後調用 cluster slots
命令來獲取當前 Redis 集羣服務狀態,這裏返回的數據會包含全部的節點 IP + 端口,同時包含某個節點的起始結束返回,具體的返回值以下:npm
redis 127.0.0.1:6379> cluster slots 1) 1) (integer) 0 1) (integer) 4095 2) 1) "127.0.0.1" 1) (integer) 7000 3) 1) "127.0.0.1" 1) (integer) 7004 2) 1) (integer) 12288 1) (integer) 16383 2) 1) "127.0.0.1" 1) (integer) 7003 3) 1) "127.0.0.1" 1) (integer) 7007 3) 1) (integer) 4096 1) (integer) 8191 2) 1) "127.0.0.1" 1) (integer) 7001 3) 1) "127.0.0.1" 1) (integer) 7005 4) 1) (integer) 8192 1) (integer) 12287 2) 1) "127.0.0.1" 1) (integer) 7002 3) 1) "127.0.0.1" 1) (integer) 7006
轉換成 JS 的數據大體是一個這樣的結構:bash
[ [ // slot info 0, // slot range start 4095, // slot range end [ '127.0.0.1', // IP 7000 // port ] ], [ // other slot info 12288, 16383, [ '127.0.0.1', 7003 ] ], ]
cluster slot 的描述: https://redis.io/commands/cluster-slots
在獲取到這些真實的節點信息之後,會依據拿到的節點集合,再次調用 connectionPool
的 reset
方法,由於上次調用實際上是使用的 startNode
傳入的初始值,這裏則會使用當前服務正在運行的數據進行一次替換。
在完成這一動做後,則會觸發 refresh
事件,也就會進入下邊的 readyCheck
環節,確保服務是可用的。函數
查看 readyCheck
的實現,主要也是經過調用 cluster info
命令來獲取當前服務的狀態。
在前文所講處理 twemproxy 模式下的問題時,將 Redis Client
的 readyCheck
從 info
修改成了 ping
命令來實現,而這裏,則沒有進行修改,由於要注意的是,這裏並非 info
命令,而是 cluster
命令,只不過參數是 info
。
Cluster
模塊會使用 cluster info
命令中的 cluster_state
字段來做爲檢測的依據,數據會按照 k:v\nk:v\n 這種格式組合,因此咱們會在代碼中看到經過匹配換行來取得對應的字段,並經過截取的方式拿到具體的值。
不過針對這裏的邏輯,我我的卻是以爲直接用正則匹配反而更簡單一些,由於拿到參數的值並無作一些額外的操做,僅僅是用來驗證。
private readyCheck(callback: CallbackFunction<void | "fail">): void { (this as any).cluster("info", function (err, res) { if (err) { return callback(err); } if (typeof res !== "string") { return callback(); } let state; const lines = res.split("\r\n"); for (let i = 0; i < lines.length; ++i) { const parts = lines[i].split(":"); if (parts[0] === "cluster_state") { state = parts[1]; break; } } if (state === "fail") { debug("cluster state not ok (%s)", state); callback(null, state); } else { callback(); } }); }
當咱們發現 cluster info
中返回的數據爲 fail
時,那麼說明集羣中的這個節點是一個不可用的狀態,那麼就會調用 disconnect
斷開並進行重連。
在觸發 disconnect
的時候,同時會關閉 ClusterSubscriber
中的實例,由於咱們的鏈接已經要關閉了,那麼也沒有必要留着一個註冊 Pub/Sub 的實例在這裏了。
在這些操做完成以後,會進入 retry
的流程,這裏其實就是按照某種邏輯從新調用了 connect
方法,再次執行前邊所描述的邏輯。
針對整個流程畫圖表示大概是這樣的:
在實例建立完畢後,那麼下一步就會涉及到調用命令了。
在前邊實例化過程當中不可避免的也提到了一些 sendCommand
的事情,Redis
在實例化的過程當中,會有一個狀態的變動,而每次觸發 sendCommand
實際上都會去檢查那個狀態,若是當前尚未創建好鏈接,那麼這時的命令會被放入到 offlineQueue
中暫存的。
在 readyCheck
經過以後會按照順序來調用這些命令。
固然,在 sendCommand
方法中也存在了對當前實例狀態的判斷,若是是 wait
,那麼能夠認爲實例開啓了 lazyConnect
模式,這時會嘗試與服務端創建鏈接。
同時在 sendCommand
中也會對命令進行判斷,一些 Pub/Sub 對應的命令,好比 publish
,會被轉發到 ClusterSubscriber
對應的實例上去執行,而其餘普通的命令則會放到 connectionPool
中去執行。
經過這樣的方式將發佈訂閱與普通的命令進行了拆分。
一樣,由於是 Cluster 模式,因此還會有主從之間的拆分邏輯,這個能夠經過實例化 Redis Cluster
時傳入的 scaleReads
參數來決定,默認的話是 master
,可選的還有 all
、slave
以及一個接收命令以及實例列表的自定義函數。
知識點來了
在 ioredis 中,默認狀況下的配置是 master
,這也就意味着全部的請求都會發送到 master
節點,這就意味着若是你爲了提升讀取的性能所建立的一些從庫,__根本不會被訪問到__。
詳情見文檔: https://www.npmjs.com/package/ioredis#user-content-read-write-splitting
若是想要使用從庫,那麼能夠把 scaleReads
修改成 slave
,可是不須要擔憂說一些會對數據庫形成修改的命令發送到從庫,在 sendCommand
中會針對所發送的命令進行檢測,若是不是隻讀的命令,且 scaleReads
設置的不是 master
會強行覆蓋爲 master
。
針對命令是否爲只讀的判斷: https://github.com/luin/ioredis/blob/master/lib/cluster/index.ts#L599
而後關於那個自定義函數,其實就是須要本身根據 command
去評估究竟使用哪一個(些)實例,而後把對應的實例返回出去。
最終,咱們拿到了一個 Redis
實例,這時使用該 Redis
實例進行調用 sendCommand
便可。
而後後邊的邏輯就和普通的 Redis
觸發命令沒有什麼區別了。
總的來看, 在 ioredis 的實現中 Redis Cluster
是做爲一個 Redis
的擴展來作的,在不少地方都會看到 Redis
的存在,而且一樣都會繼承自 Command
實例,這就讓用戶在使用的過程當中並無太多的差別,只有在實例化時傳入的參數不太同樣,在調用各類 Redis
命令時則沒有區別,而在 Cluster
中則內部調用了 Redis
的 sendCommand
完成了邏輯上的複用。