本文只是拋磚引玉; Pnode
在下在 Shopee 工做,不喜歡加班想換個環境的同窗能夠考慮一下git
拒絕 996,那就來 shopee,待遇 work life balance 兩不: www.v2ex.com/t/672561#re…github
###前言redis
最近跟同事請教了一下 redis 相關的事情,就找來了一下 redis 的驅動,看看這些庫是怎麼作 redis cluster
的 pipeline
以及 transaction
的,如下就把相關流程的代碼剖析一下,仍是有一些有意思的點的。算法
由於 C 語言比較底層,其餘語言感受描述性都差了一點,我找的是 elixir 的庫來看的,質量很高。數組
過後才發現原來這個elixir 的 redis 庫的做者是 elixir 這門語言的核心開發者;P緩存
正文開始。bash
首先呢, Elixir 的這個庫不支持 redis 集羣,後來有人基於它擴展成支持簡單的集羣,因此先講普通的怎麼作,再擴展。服務器
這個庫是單進程異步,當你發命令過來時,此庫處理完後會立刻發給 Redis 服務器,而後就能夠接收新的命令,當 Redis Server 答覆時,會返回此Reply
給你。數據結構
通常鏈接池有通用的庫,因此交給調用方來作,庫只處理每一個鏈接的請求。
ps,上面這個標題就是來自 redis 官網的,明顯 RE
是 typo。
Redis 用的協議RESP
是本身定的文本協議,客戶端與服務端直接經過 TCP 鏈接通信。
這個文本協議,其實就是對數據的序列化,如下就是規則:
*
"對於客戶端而言,發過去給服務器的命令其實數據結構就是數組,因此只須要*數組長度\r\n$數組[0]裏命令的長度\r\n 數組[0]裏命令
。
提及來有點抽象,看看實際例子:
LLEN mylist
按照協議 encode就變成 *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
的文本,
LLEN
以及 6 個字符的mylist
SET mykey 1
按協議 encode 就變成*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"
SET
以及 5 個字符的mykey
,還有 1 個字符的1
能夠看看這個庫是怎麼作的,就是遞歸拼接,記錄數組的長度,最後在最開頭拼上*數組長度
。
@doc ~S""" Packs a list of Elixir terms to a Redis (RESP) array. This function returns an iodata (instead of a binary) because the packed result is usually sent to Redis through `:gen_tcp.send/2` or similar. It can be converted to a binary with `IO.iodata_to_binary/1`. All elements of `elems` are converted to strings with `to_string/1`, hence this function supports encoding everything that implements `String.Chars`. ## Examples iex> iodata = Redix.Protocol.pack(["SET", "mykey", 1]) iex> IO.iodata_to_binary(iodata) "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n" """
@crlf_iodata [?\r, ?\n]
@spec pack([binary]) :: iodata
def pack(items) when is_list(items) do
pack(items, [], 0)
end
defp pack([item | rest], acc, count) do
item = to_string(item)
new_acc = [acc, [?$, Integer.to_string(byte_size(item)), @crlf_iodata, item, @crlf_iodata]]
pack(rest, new_acc, count + 1)
end
defp pack([], acc, count) do
[?*, Integer.to_string(count), @crlf_iodata, acc]
end
複製代碼
做爲 client 的庫,維護長鏈接,避免頻繁建立鏈接,這個是常規操做。
而有趣的是,做者使用了 erlang OTP
自帶的狀態機框架 gen_statem
來維持 TCP 長鏈接,這個功能是OTP 19
也就是 16 年才推出的,在不知道此做者是 elixir 語言的貢獻者前,我還小小的膜拜了一下。
狀態機以下圖,初始狀態不是同步鏈接,就是connecting 狀態;同步的話,成功就是處於 connected 狀態。
狀態的動做依靠 TCP
的事件消息來驅動,狀態轉移本身控制。
例子:
def disconnected({:timeout, :reconnect}, _timer_info, %__MODULE__{} = data) do
{:ok, socket_owner} = SocketOwner.start_link(self(), data.opts, data.table)
new_data = %{data | socket_owner: socket_owner}
{:next_state, :connecting, new_data}
end
複製代碼
以上代碼就是在 discconected
狀態收到 TCP
的{:timeout, :reconnect}
消息,建立一個新的TCP socket
進程,將狀態轉移到:connecting
。
而 socket
進程在初始化時,會發送connect
消息給本身:
def handle_info(:connect, state) do
with {:ok, socket, address} <- Connector.connect(state.opts),
:ok <- setopts(state, socket, active: :once) do
send(state.conn, {:connected, self(), socket, address})
{:noreply, %{state | socket: socket}}
else
{:error, reason} -> stop(reason, state)
{:stop, reason} -> stop(reason, state)
end
end
複製代碼
成功了,就發送connected
消息給原來的狀態機進程(也就是 connection 進程)
,connection
進程處於connecting
狀態時,接受此消息,更新 socket 信息,狀態轉移到 connected
。
def connecting(
:info,
{:connected, owner, socket, address},
%__MODULE__{socket_owner: owner} = data
) do
if data.backoff_current do
:telemetry.execute([:redix, :reconnection], %{}, %{
connection: data.opts[:name] || self(),
address: address
})
end
data = %{data | socket: socket, backoff_current: nil, connected_address: address}
{:next_state, :connected, %{data | socket: socket}}
end
複製代碼
Redis 執行命令主要有 Comand
、Pipeline
以及Trasaction
三種概念:
command
:一問一答式的,客戶端等待 server 返回消息;Pipeline
:發送一連串命令,這些命令發往 server,不用一問一答,收到命令立刻返回。sever 以隊列執行,執行完後所有結果返回回來;Trasaction
:依靠MULTI
/EXEC
命令,MULTI
命令開始Trasaction
,此後發送的命令都存到 server 的隊列裏,EXEC
命令發送後立刻這隊列裏全部命令;期間不會有其餘命令影響這些命令的執行。庫裏把 Command
命令用 Pipeline
來作,其實本質是同樣的。
如下的pipeline
就是負責用戶調用的函數,:gen_statem.cast
就是把消息數據傳給狀態機,接着就是起了一個進程來監控這個鏈接,掛了就退出;同時阻塞等待狀態機完成處理得到數據後發消息過來。
def pipeline(conn, commands, timeout) do
conn = GenServer.whereis(conn)
request_id = Process.monitor(conn)
# We cast to the connection process knowing that it will reply at some point,
# either after roughly timeout or when a response is ready.
cast = {:pipeline, commands, _from = {self(), request_id}, timeout}
:ok = :gen_statem.cast(conn, cast)
receive do
{^request_id, resp} ->
_ = Process.demonitor(request_id, [:flush])
resp
{:DOWN, ^request_id, _, _, reason} ->
exit(reason)
end
end
複製代碼
狀態機這塊的代碼就是:
def connected(:cast, {:pipeline, commands, from, timeout}, data) do
{ncommands, data} = get_client_reply(data, commands)
if ncommands > 0 do
{counter, data} = get_and_update_in(data.counter, &{&1, &1 + 1})
row = {counter, from, ncommands, _timed_out? = false}
:ets.insert(data.table, row)
case data.transport.send(data.socket, Enum.map(commands, &Protocol.pack/1)) do
:ok ->
actions =
case timeout do
:infinity -> []
_other -> [{{:timeout, {:client_timed_out, counter}}, timeout, from}]
end
{:keep_state, data, actions}
{:error, _reason} ->
# The socket owner will get a closed message at some point, so we just move to the
# disconnected state.
:ok = data.transport.close(data.socket)
{:next_state, :disconnected, data}
end
else
reply(from, {:ok, []})
{:keep_state, data}
end
end
複製代碼
沒什麼特別的,get_client_reply
就是處理客戶端是否想獲得服務器回覆的命令的 CLIENT REPLY
的各類指令,
defp get_client_reply([command | rest], ncommands, client_reply) do
case parse_client_reply(command) do
:off -> get_client_reply(rest, ncommands, :off)
:skip when client_reply == :off -> get_client_reply(rest, ncommands, :off)
:skip -> get_client_reply(rest, ncommands, :skip)
:on -> get_client_reply(rest, ncommands + 1, :on)
nil when client_reply == :on -> get_client_reply(rest, ncommands + 1, client_reply)
nil when client_reply == :off -> get_client_reply(rest, ncommands, client_reply)
nil when client_reply == :skip -> get_client_reply(rest, ncommands, :on)
end
end
複製代碼
接着就是把命令序列號成 RESP,使用data.transport.send
發送給服務器,其實 Redis 除了 TCP 外還可使用SSL/TLS 協議,因此就有了這一層抽象。
若是是 TCP,那麼socket 服務就會在 redis 服務器返回消息後,此函數接收自動處理:
def handle_info({transport, socket, data}, %__MODULE__{socket: socket} = state)
when transport in [:tcp, :ssl] do
:ok = setopts(state, socket, active: :once)
state = new_data(state, data)
{:noreply, state}
end
複製代碼
官網寫的很好了,我簡單說一下好了。
Redis Cluster does not use consistent hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.
Redis Cluster
沒有用一致性哈希算法,而是用了hash slot
(哈希桶)
There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.
redis 會固定分配 16384 個 slots 到不一樣的節點,用的算法就是對 key 作 CRC16 而後對 16384取模: HASH_SLOT = CRC16(key) mod 16384
例子以下:
Every node in a Redis Cluster is responsible for a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:
- Node A contains hash slots from 0 to 5500.
- Node B contains hash slots from 5501 to 11000.
- Node C contains hash slots from 11001 to 16383.
複製代碼
This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.
用這樣的算法,比一致性哈希方便,更有操做性:
Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot.
Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.
對於 redis 或者對用戶來講,能夠輕鬆地分配移動 slots;
而一致性哈希就只能本身算虛擬節點,而且『祈求』以後請求量多了最終達到想要的平衡了。
#####redix-cluster
原版沒有支持集羣,zhongwencool/redix-cluster 寫了一個簡單的包裝版本。
只須要看這段,就很清楚爲了集羣作了些啥:
@spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
def pipeline(pipeline, opts) do
case RedixCluster.Monitor.get_slot_cache do
{:cluster, slots_maps, slots, version} ->
pipeline
|> parse_keys_from_pipeline
|> keys_to_slot_hashs
|> is_same_slot_hashs
|> get_pool_by_slot(slots_maps, slots, version)
|> query_redis_pool(pipeline, :pipeline, opts)
{:not_cluster, version, pool_name} ->
query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
end
end
複製代碼
|>
就是相似 unix 的 管道 |
,把函數返回值當作下個函數的第一個參數傳給他。
get_slot_cache
就是獲取redis的cluster slots
這個記錄,而且緩存起來。
CLUSTER SLOTS returns details about which cluster slots map to which Redis instances.
parse_keys_from_pipeline
將所有 keys 從Pineline
命令裏提取出來keys_to_slot_hashs
找出 各個key 在哪一個 hash slotis_same_slot_hashs
判斷全部 key 是否是在同一個 hash slot,是的,這個還不支持跨 slot,我在準備幫他寫一個get_pool_by_slot
項目用了鏈接池來管理,因此要根據名字找對應的鏈接query_redis_pool
就是調用 原來的 Redix 作處理了簡單來講,這個庫就是殘廢的,哈哈哈。。。
不支持分佈不一樣 slot,就是玩具。