實現 Redis 協議解析器

本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹Redis 通訊協議 以及 協議解析器 的實現,若您對協議有所瞭解能夠直接閱讀協議解析器部分。html

Redis 通訊協議

Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機能夠高效的進行解析且易於被人類讀懂。git

RESP 是一個二進制安全的文本協議,工做於 TCP 協議上。客戶端和服務器發送的命令或數據一概以 \r\n (CRLF)結尾。github

RESP 定義了5種格式:golang

  • 簡單字符串(Simple String): 服務器用來返回簡單的結果,好比"OK"。非二進制安全,且不容許換行。
  • 錯誤信息(Error): 服務器用來返回簡單的結果,好比"ERR Invalid Synatx"。非二進制安全,且不容許換行。
  • 整數(Integer): llenscard等命令的返回值, 64位有符號整數
  • 字符串(Bulk String): 二進制安全字符串, get 等命令的返回值
  • 數組(Array, 舊版文檔中稱 Multi Bulk Strings): Bulk String 數組,客戶端發送指令以及lrange等命令響應的格式

RESP 經過第一個字符來表示格式:redis

  • 簡單字符串:以"+" 開始, 如:"+OK\r\n"
  • 錯誤:以"-" 開始,如:"-ERR Invalid Synatx\r\n"
  • 整數:以":"開始,如:":1\r\n"
  • 字符串:以 $ 開始
  • 數組:以 * 開始

Bulk String有兩行,第一行爲 $+正文長度,第二行爲實際內容。如:數據庫

$3\r\nSET\r\n

Bulk String 是二進制安全的能夠包含任意字節,就是說能夠在 Bulk String 內部包含 "\r\n" 字符(行尾的CRLF被隱藏):數組

$4
a\r\nb

$-1 表示 nil, 好比使用 get 命令查詢一個不存在的key時,響應即爲$-1緩存

Array 格式第一行爲 "*"+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]的報文:安全

*2
$3
foo
$3
bar

客戶端也使用 Array 格式向服務端發送指令。命令自己將做爲第一個參數,如 SET key value指令的RESP報文:服務器

*3
$3
SET
$3
key
$5
value

將換行符打印出來:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

協議解析器

咱們在 實現TCP服務器 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。

協議解析器將接收 Socket 傳來的數據,並將其數據還原爲 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原爲 ['SET', 'key', 'value']

本文完整代碼: Github: HDT3213/godis

來自客戶端的請求均爲數組格式,它在第一行中標記報文的總行數並使用CRLF做爲分行符。

bufio 標準庫能夠將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢後返回,因此咱們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。

須要注意的是RESP是二進制安全的協議,它容許在正文中使用CRLF字符。舉例來講 Redis 能夠正確接收並執行SET "a\r\nb" 1指令, 這條指令的正確報文是這樣的:

*3  
$3
SET
$4
a\r\nb 
$7
myvalue

ReadBytes 讀取到第五行 "a\r\nb\r\n"時會將其誤認爲兩行:

*3  
$3
SET
$4
a  // 錯誤的分行
b // 錯誤的分行
$7
myvalue

所以當讀取到第四行$4後, 不該該繼續使用 ReadBytes('\n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
_, err = io.ReadFull(reader, msg)

定義 Client 結構體做爲客戶端抽象:

type Client struct {
    /* 與客戶端的 Tcp 鏈接 */
    conn   net.Conn

    /* 
     * 帶有 timeout 功能的 WaitGroup, 用於優雅關閉
     * 當響應被完整發送前保持 waiting 狀態, 阻止連接被關閉
     */
    waitingReply wait.Wait

    /* 標記客戶端是否正在發送指令 */ 
    sending atomic.AtomicBool
    
    /* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */
    expectedArgsCount uint32
    
    /* 已經接收的參數數量, 即 len(args)*/ 
    receivedCount uint32
    
    /*
     * 已經接收到的命令參數,每一個參數由一個 []byte 表示
     */
    args [][]byte
}

定義解析器:

type Handler struct {

    /* 
     * 記錄活躍的客戶端連接 
     * 類型爲 *Client -> placeholder 
     */
    activeConn sync.Map 

    /* 數據庫引擎,執行指令並返回結果 */
    db db.DB

    /* 關閉狀態標誌位,關閉過程當中時拒絕新建鏈接和新請求 */
    closing atomic.AtomicBool 
}

接下來能夠編寫主要部分了:

func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
    if h.closing.Get() {
        // 關閉過程當中不接受新鏈接
        _ = conn.Close()
    }

    /* 初始化客戶端狀態 */
    client := &Client {
        conn:   conn,
    }
    h.activeConn.Store(client, 1)

    reader := bufio.NewReader(conn)
    var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度
    var err error
    var msg []byte
    for {
        /* 讀取下一行數據 */ 
        if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行
            msg, err = reader.ReadBytes('\n')
            // 判斷是否以 \r\n 結尾
            if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
                errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                _, _ =  client.conn.Write(errReply.ToBytes())
            }
        } else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取
            msg = make([]byte, fixedLen + 2)
            _, err = io.ReadFull(reader, msg)
            // 判斷是否以 \r\n 結尾
            if len(msg) == 0 || 
              msg[len(msg) - 2] != '\r' ||  
              msg[len(msg) - 1] != '\n'{
                errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                _, _ =  client.conn.Write(errReply.ToBytes())
            }
            // Bulk String 讀取完畢,從新使用正常模式
            fixedLen = 0 
        }
        // 處理 IO 異常
        if err != nil {
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                logger.Info("connection close")
            } else {
                logger.Warn(err)
            }
            _ = client.Close()
            h.activeConn.Delete(client)
            return // io error, disconnect with client
        }

        /* 解析收到的數據 */
        if !client.sending.Get() { 
            // sending == false 代表收到了一條新指令
            if msg[0] == '*' {
                // 讀取第一行獲取參數個數
                expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
                if err != nil {
                    _, _ = client.conn.Write(UnknownErrReplyBytes)
                    continue
                }
                // 初始化客戶端狀態
                client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉
                client.sending.Set(true) // 正在接收指令中
                // 初始化計數器和緩衝區 
                client.expectedArgsCount = uint32(expectedLine) 
                client.receivedCount = 0
                client.args = make([][]byte, expectedLine)
            } else {
                // TODO: text protocol
            }
        } else {
            // 收到了指令的剩餘部分(非首行)
            line := msg[0:len(msg)-2] // 移除換行符
            if line[0] == '$' { 
                // BulkString 的首行,讀取String長度
                fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
                if err != nil {
                    errReply := &reply.ProtocolErrReply{Msg:err.Error()}
                    _, _ = client.conn.Write(errReply.ToBytes())
                }
                if fixedLen <= 0 {
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ = client.conn.Write(errReply.ToBytes())
                }
            } else { 
                // 收到參數
                client.args[client.receivedCount] = line
                client.receivedCount++
            }


            // 一條命令發送完畢
            if client.receivedCount == client.expectedArgsCount {
                client.sending.Set(false)

                // 執行命令並響應
                result := h.db.Exec(client.args)
                if result != nil {
                    _, _ = conn.Write(result.ToBytes())
                } else {
                    _, _ = conn.Write(UnknownErrReplyBytes)
                }

                // 重置客戶端狀態,等待下一條指令
                client.expectedArgsCount = 0
                client.receivedCount = 0
                client.args = nil
                client.waitingReply.Done()
            }
        }
    }
}
相關文章
相關標籤/搜索