ioredis源碼閱讀[1]

上次針對 redis 的源碼閱讀涉及普通的 client,此次針對 cluster 模式下的 client 源碼進行分析。
具體的源碼路徑就是在 lib/cluster 目錄下了。

Cluster 實例化

開門見山,咱們使用 Cluster 模式最開始也是要進行實例化的,這裏調用的代碼位於 lib/cluster/index.tsjavascript

const { Cluster } = require('ioredis')

const cluster = new Redis.Cluster([
  {
    port: 6380,
    host: "127.0.0.1",
  },
  {
    port: 6381,
    host: "127.0.0.1",
  },
])

cluster.get('someKey').then()

從源碼上來看,Cluster 預期接收兩個參數,第一個是啓動的節點集合 startNodes,第二個是一個可選的 options
第一個參數是比較固定的,沒有太多含義,而第二個參數能夠傳遞不少配置,這個能夠在 README 中找到(目前是 12 個參數): https://www.npmjs.com/package/ioredis#cluster
若是沒有傳入的話,則會有默認值來填充,但並非全部的參數都會有默認值。 java

Redis Client 同樣的處理邏輯,在構造函數中會 call 一下 Commander,隨後還會實例化一個 ConnectionPool 對象,並將 options.redisOptions 傳入進去。node

代碼位於 lib/cluster/ConnectionPool.ts

在實例化 ConnectionPool 的過程當中並無作什麼實質性的操做,只是把 options.redisOptions 放到了一個 private 屬性中。
隨後在 Cluster 構造函數中註冊了四個對應的事件,分別是 -node+nodedrainnodeError,分別表明了 移除節點、增長節點、節點爲空、節點出錯。
ConnectionPool 本身實現的一些事件,後續會看到在哪裏觸發。 git

接下來實例化了一個 ClusterSubscriber 對象,並將上邊實例化的 connectionPool 實例做爲參數放了進去,並把 Cluster 實例的引用也傳了進去。
實例化過程當中作的事情也比較簡單,就是監聽了上邊咱們提到的 -node+node 兩個事件,在移除節點時,會判斷是否存在 subscriber 屬性,若是不存在則跳出,若是存在的話,會判斷被移除的 key 是否等於當前 subscriber
這裏能夠提一下 subscriber 到底是什麼,這個在下邊的 selectSubscriber 函數中能夠看到,就是實例化了一個 Redis Client,而實例化 Redis Client 所使用的參數,都是經過調用 connectionPoolgetNodes 方法所拿到的,並隨機挑選其中一個節點配置來進行實例化。
以後就會經過該 Redis Client 來調用兩個命令,subscriberpsubscriber,這兩個是用來實現 Pub/Sub 的,具體區別其實就是後者能夠監聽一個帶有通配符的服務。github

subscriber 與 psubscriber 的區別: https://redis.io/topics/pubsub
並在接收到數據之後,經過 emit 的方式轉發給 Cluster,後續 Cluster 模式下的 Pub/Sub 將會經過這裏來進行數據的傳遞。

Connect

最後咱們調用 connect 函數就完成了整個 Cluster 實例化的過程。redis

若是是開啓了 lazyConnect 那麼這裏就直接修改實例狀態爲 wait,而後結束整個流程。

connect 時咱們首先會解析 startNodes,拿到對應的 IP 和 端口等信息,而後會調用 reset 重置 connectionPool 中的實例,connectionPool 中會存儲多個 IP+端口 的 Redis 實例引用,調用 reset 會把一些應該不存在的實例給關掉,而後把一些新增長的進行建立,複用已經存在的實例,同時在新增節點的時候會觸發 ClusterSubscriber+node 事件,若是此時是第一次觸發,那麼這時 ClusterSubscriber 纔會真正的去建立用於 Pub/Sub 的 Redis 實例。 數據庫

以後會註冊一個 refresh 事件,在事件內部會調用 readyCheck,在這以前,則須要先去獲取 Redis 節點的一些信息,這裏是經過 getInfoFromNode 方法來實現的,內部會拿到一個 Redis 實例,並調用 duplicate 建立一個額外的實例,而後調用 cluster slots 命令來獲取當前 Redis 集羣服務狀態,這裏返回的數據會包含全部的節點 IP + 端口,同時包含某個節點的起始結束返回,具體的返回值以下:npm

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 0
   1) (integer) 4095
   2) 1) "127.0.0.1"
      1) (integer) 7000
   3) 1) "127.0.0.1"
      1) (integer) 7004
2) 1) (integer) 12288
   1) (integer) 16383
   2) 1) "127.0.0.1"
      1) (integer) 7003
   3) 1) "127.0.0.1"
      1) (integer) 7007
3) 1) (integer) 4096
   1) (integer) 8191
   2) 1) "127.0.0.1"
      1) (integer) 7001
   3) 1) "127.0.0.1"
      1) (integer) 7005
4) 1) (integer) 8192
   1) (integer) 12287
   2) 1) "127.0.0.1"
      1) (integer) 7002
   3) 1) "127.0.0.1"
      1) (integer) 7006

轉換成 JS 的數據大體是一個這樣的結構:bash

[
  [ // slot info
    0,    // slot range start 
    4095, // slot range end
    [
      '127.0.0.1', // IP
      7000         // port
    ]
  ],
  [  // other slot info
    12288,
    16383,
    [
      '127.0.0.1',
      7003
    ]
  ],
]
cluster slot 的描述: https://redis.io/commands/cluster-slots

在獲取到這些真實的節點信息之後,會依據拿到的節點集合,再次調用 connectionPoolreset 方法,由於上次調用實際上是使用的 startNode 傳入的初始值,這裏則會使用當前服務正在運行的數據進行一次替換。
在完成這一動做後,則會觸發 refresh 事件,也就會進入下邊的 readyCheck 環節,確保服務是可用的。函數

readyCheck

查看 readyCheck 的實現,主要也是經過調用 cluster info 命令來獲取當前服務的狀態。
在前文所講處理 twemproxy 模式下的問題時,將 Redis ClientreadyCheckinfo 修改成了 ping 命令來實現,而這裏,則沒有進行修改,由於要注意的是,這裏並非 info 命令,而是 cluster 命令,只不過參數是 info

Cluster 模塊會使用 cluster info 命令中的 cluster_state 字段來做爲檢測的依據,數據會按照 k:v\nk:v\n 這種格式組合,因此咱們會在代碼中看到經過匹配換行來取得對應的字段,並經過截取的方式拿到具體的值。

不過針對這裏的邏輯,我我的卻是以爲直接用正則匹配反而更簡單一些,由於拿到參數的值並無作一些額外的操做,僅僅是用來驗證。
private readyCheck(callback: CallbackFunction<void | "fail">): void {
  (this as any).cluster("info", function (err, res) {
    if (err) {
      return callback(err);
    }
    if (typeof res !== "string") {
      return callback();
    }

    let state;
    const lines = res.split("\r\n");
    for (let i = 0; i < lines.length; ++i) {
      const parts = lines[i].split(":");
      if (parts[0] === "cluster_state") {
        state = parts[1];
        break;
      }
    }

    if (state === "fail") {
      debug("cluster state not ok (%s)", state);
      callback(null, state);
    } else {
      callback();
    }
  });
}

當咱們發現 cluster info 中返回的數據爲 fail 時,那麼說明集羣中的這個節點是一個不可用的狀態,那麼就會調用 disconnect 斷開並進行重連。
在觸發 disconnect 的時候,同時會關閉 ClusterSubscriber 中的實例,由於咱們的鏈接已經要關閉了,那麼也沒有必要留着一個註冊 Pub/Sub 的實例在這裏了。
在這些操做完成以後,會進入 retry 的流程,這裏其實就是按照某種邏輯從新調用了 connect 方法,再次執行前邊所描述的邏輯。

針對整個流程畫圖表示大概是這樣的:

Redis-Cluster-Create-Flow.jpg

sendCommand

在實例建立完畢後,那麼下一步就會涉及到調用命令了。
在前邊實例化過程當中不可避免的也提到了一些 sendCommand 的事情,Redis 在實例化的過程當中,會有一個狀態的變動,而每次觸發 sendCommand 實際上都會去檢查那個狀態,若是當前尚未創建好鏈接,那麼這時的命令會被放入到 offlineQueue 中暫存的。
readyCheck 經過以後會按照順序來調用這些命令。

固然,在 sendCommand 方法中也存在了對當前實例狀態的判斷,若是是 wait,那麼能夠認爲實例開啓了 lazyConnect 模式,這時會嘗試與服務端創建鏈接。

同時在 sendCommand 中也會對命令進行判斷,一些 Pub/Sub 對應的命令,好比 publish,會被轉發到 ClusterSubscriber 對應的實例上去執行,而其餘普通的命令則會放到 connectionPool 中去執行。
經過這樣的方式將發佈訂閱與普通的命令進行了拆分。

一樣,由於是 Cluster 模式,因此還會有主從之間的拆分邏輯,這個能夠經過實例化 Redis Cluster 時傳入的 scaleReads 參數來決定,默認的話是 master,可選的還有 allslave 以及一個接收命令以及實例列表的自定義函數。

知識點來了

在 ioredis 中,默認狀況下的配置是 master,這也就意味着全部的請求都會發送到 master 節點,這就意味着若是你爲了提升讀取的性能所建立的一些從庫,__根本不會被訪問到__。

詳情見文檔: https://www.npmjs.com/package/ioredis#user-content-read-write-splitting

若是想要使用從庫,那麼能夠把 scaleReads 修改成 slave,可是不須要擔憂說一些會對數據庫形成修改的命令發送到從庫,在 sendCommand 中會針對所發送的命令進行檢測,若是不是隻讀的命令,且 scaleReads 設置的不是 master 會強行覆蓋爲 master

針對命令是否爲只讀的判斷: https://github.com/luin/ioredis/blob/master/lib/cluster/index.ts#L599

而後關於那個自定義函數,其實就是須要本身根據 command 去評估究竟使用哪一個(些)實例,而後把對應的實例返回出去。
最終,咱們拿到了一個 Redis 實例,這時使用該 Redis 實例進行調用 sendCommand 便可。
而後後邊的邏輯就和普通的 Redis 觸發命令沒有什麼區別了。

總結

總的來看, 在 ioredis 的實現中 Redis Cluster 是做爲一個 Redis 的擴展來作的,在不少地方都會看到 Redis 的存在,而且一樣都會繼承自 Command 實例,這就讓用戶在使用的過程當中並無太多的差別,只有在實例化時傳入的參數不太同樣,在調用各類 Redis 命令時則沒有區別,而在 Cluster 中則內部調用了 RedissendCommand 完成了邏輯上的複用。

相關文章
相關標籤/搜索