本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹Redis 通訊協議 以及 協議解析器 的實現,若您對協議有所瞭解能夠直接閱讀協議解析器部分。html
Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機能夠高效的進行解析且易於被人類讀懂。git
RESP 是一個二進制安全的文本協議,工做於 TCP 協議上。客戶端和服務器發送的命令或數據一概以 \r\n
(CRLF)結尾。github
RESP 定義了5種格式:golang
llen
、scard
等命令的返回值, 64位有符號整數get
等命令的返回值lrange
等命令響應的格式RESP 經過第一個字符來表示格式:redis
$
開始*
開始Bulk String有兩行,第一行爲 $
+正文長度,第二行爲實際內容。如:數據庫
$3\r\nSET\r\n
Bulk String 是二進制安全的能夠包含任意字節,就是說能夠在 Bulk String 內部包含 "\r\n" 字符(行尾的CRLF被隱藏):數組
$4 a\r\nb
$-1
表示 nil, 好比使用 get 命令查詢一個不存在的key時,響應即爲$-1
。緩存
Array 格式第一行爲 "*"+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]
的報文:安全
*2 $3 foo $3 bar
客戶端也使用 Array 格式向服務端發送指令。命令自己將做爲第一個參數,如 SET key value
指令的RESP報文:服務器
*3 $3 SET $3 key $5 value
將換行符打印出來:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
咱們在 實現TCP服務器 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。
協議解析器將接收 Socket 傳來的數據,並將其數據還原爲 [][]byte
格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n"
將被還原爲 ['SET', 'key', 'value']
。
本文完整代碼: Github: HDT3213/godis
來自客戶端的請求均爲數組格式,它在第一行中標記報文的總行數並使用CRLF
做爲分行符。
bufio
標準庫能夠將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢後返回,因此咱們使用 reader.ReadBytes('\n')
來保證每次讀取到完整的一行。
須要注意的是RESP是二進制安全
的協議,它容許在正文中使用CRLF
字符。舉例來講 Redis 能夠正確接收並執行SET "a\r\nb" 1
指令, 這條指令的正確報文是這樣的:
*3 $3 SET $4 a\r\nb $7 myvalue
當 ReadBytes
讀取到第五行 "a\r\nb\r\n"時會將其誤認爲兩行:
*3 $3 SET $4 a // 錯誤的分行 b // 錯誤的分行 $7 myvalue
所以當讀取到第四行$4
後, 不該該繼續使用 ReadBytes('\n')
讀取下一行, 應使用 io.ReadFull(reader, msg)
方法來讀取指定長度的內容。
msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2 _, err = io.ReadFull(reader, msg)
定義 Client
結構體做爲客戶端抽象:
type Client struct { /* 與客戶端的 Tcp 鏈接 */ conn net.Conn /* * 帶有 timeout 功能的 WaitGroup, 用於優雅關閉 * 當響應被完整發送前保持 waiting 狀態, 阻止連接被關閉 */ waitingReply wait.Wait /* 標記客戶端是否正在發送指令 */ sending atomic.AtomicBool /* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */ expectedArgsCount uint32 /* 已經接收的參數數量, 即 len(args)*/ receivedCount uint32 /* * 已經接收到的命令參數,每一個參數由一個 []byte 表示 */ args [][]byte }
定義解析器:
type Handler struct { /* * 記錄活躍的客戶端連接 * 類型爲 *Client -> placeholder */ activeConn sync.Map /* 數據庫引擎,執行指令並返回結果 */ db db.DB /* 關閉狀態標誌位,關閉過程當中時拒絕新建鏈接和新請求 */ closing atomic.AtomicBool }
接下來能夠編寫主要部分了:
func (h *Handler)Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // 關閉過程當中不接受新鏈接 _ = conn.Close() } /* 初始化客戶端狀態 */ client := &Client { conn: conn, } h.activeConn.Store(client, 1) reader := bufio.NewReader(conn) var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度 var err error var msg []byte for { /* 讀取下一行數據 */ if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行 msg, err = reader.ReadBytes('\n') // 判斷是否以 \r\n 結尾 if len(msg) == 0 || msg[len(msg) - 2] != '\r' { errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"} _, _ = client.conn.Write(errReply.ToBytes()) } } else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取 msg = make([]byte, fixedLen + 2) _, err = io.ReadFull(reader, msg) // 判斷是否以 \r\n 結尾 if len(msg) == 0 || msg[len(msg) - 2] != '\r' || msg[len(msg) - 1] != '\n'{ errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"} _, _ = client.conn.Write(errReply.ToBytes()) } // Bulk String 讀取完畢,從新使用正常模式 fixedLen = 0 } // 處理 IO 異常 if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { logger.Info("connection close") } else { logger.Warn(err) } _ = client.Close() h.activeConn.Delete(client) return // io error, disconnect with client } /* 解析收到的數據 */ if !client.sending.Get() { // sending == false 代表收到了一條新指令 if msg[0] == '*' { // 讀取第一行獲取參數個數 expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { _, _ = client.conn.Write(UnknownErrReplyBytes) continue } // 初始化客戶端狀態 client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉 client.sending.Set(true) // 正在接收指令中 // 初始化計數器和緩衝區 client.expectedArgsCount = uint32(expectedLine) client.receivedCount = 0 client.args = make([][]byte, expectedLine) } else { // TODO: text protocol } } else { // 收到了指令的剩餘部分(非首行) line := msg[0:len(msg)-2] // 移除換行符 if line[0] == '$' { // BulkString 的首行,讀取String長度 fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { errReply := &reply.ProtocolErrReply{Msg:err.Error()} _, _ = client.conn.Write(errReply.ToBytes()) } if fixedLen <= 0 { errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"} _, _ = client.conn.Write(errReply.ToBytes()) } } else { // 收到參數 client.args[client.receivedCount] = line client.receivedCount++ } // 一條命令發送完畢 if client.receivedCount == client.expectedArgsCount { client.sending.Set(false) // 執行命令並響應 result := h.db.Exec(client.args) if result != nil { _, _ = conn.Write(result.ToBytes()) } else { _, _ = conn.Write(UnknownErrReplyBytes) } // 重置客戶端狀態,等待下一條指令 client.expectedArgsCount = 0 client.receivedCount = 0 client.args = nil client.waitingReply.Done() } } } }