Go語言之從0到1實現一個簡單的Redis鏈接池

Go語言之從0到1實現一個簡單的Redis鏈接池

前言

最近學習了一些Go語言開發相關內容,可是苦於手頭沒有能夠練手的項目,學的時候理解不清楚,學過容易忘。html

結合以前組內分享時學到的Redis相關知識,以及Redis Protocol文檔,就想着本身造個輪子練練手。git

此次我把目標放在了Redis client implemented with Go,使用原生Go語言和TCP實現一個簡單的Redis鏈接池和協議解析,以此來讓本身入門Go語言,並加深理解和記憶。(這樣作直接致使的後果是,最近寫JS時if語句老是忘帶括號QAQ)。github

本文只能算是學習Go語言時的一個隨筆,並非真正要造一個線上環境可用的Go-Redis庫~(︿( ̄︶ ̄)︿攤手)redis

順便安利如下本身作的一個跨平臺開源Redis管理軟件:AwesomeRedisManager官網AwesomeRedisManager源碼數據庫

Redis協議主要參考這篇文檔通訊協議(protocol),閱讀後瞭解到,Redis Protocol並無什麼複雜之處,主要是使用TCP來傳輸一些固定格式的字符串數據達到發送命令和解析Response數據的目的。數組

命令格式

根據文檔瞭解到,Redis命令格式爲(CR LF即\r\n):服務器

*<參數數量N> CR LF
$<參數 1 的字節數量> CR LF
<參數 1 的數據> CR LF
...
$<參數 N 的字節數量> CR LF
<參數 N 的數據> CR LF

命令的每一行都使用CRLF結尾,在命令結構的開頭就聲明瞭命令的參數數量,每一條參數都帶有長度標記,方便服務端解析。app

例如,發送一個SET命令set name jeferwangtcp

*3
$3
SET
$4
name
$9
jeferwang

響應格式

Redis的響應回覆數據主要分爲五種類型:性能

  • 狀態回覆:一行數據,使用+開頭(例如:OK、PONG等)
+OK\r\n
+PONG\r\n
  • 錯誤回覆:一行數據,使用-開頭(Redis執行命令時產生的錯誤)
-ERR unknown command 'demo'\r\n
  • 整數回覆:一行數據,使用:開頭(例如:llen返回的長度數值等)
:100\r\n
  • 批量回復(能夠理解爲字符串):兩行數據,使用$開頭,第一行爲內容長度,第二行爲具體內容
$5\r\n
abcde\r\n

特殊狀況:$-1\r\n即爲返回空數據,能夠轉化爲nil
  • 多條批量回復:使用*開頭,第一行標識本次回覆包含多少條批量回復,後面每兩行爲一個批量回復(lrange、hgetall等命令的返回數據)
*2\r\n
$5\r\n
ABCDE\r\n
$2\r\n
FG\r\n

更詳細的命令和回覆格式能夠從Redis Protocol文檔瞭解到,本位只介紹一些基本的開發中須要用到的內容

如下爲部分代碼,完整代碼見GitHub:redis4go

實現流程

  1. 首先,咱們根據官網文檔瞭解到了Redis傳輸協議,即Redis使用TCP傳輸命令的格式和接收數據的格式,據此,咱們可使用Go實現對Redis協議的解析
  2. 接下來,在能夠創建Redis鏈接並進行數據傳輸的前提下,實現一個鏈接池。
  3. 實現拼接Redis命令的方法,經過TCP發送到RedisServer
  4. 讀取RedisResponse,實現解析數據的方法

模塊結構分析

簡單分析Redis鏈接池的結構,能夠先簡單規劃爲5個部分:

  • 結構體定義entity.go
  • Redis鏈接和調用redis_conn.go
  • Redis數據類型解析data_type.go
  • 鏈接池實現pool.go

共劃分爲上述四個部分

對象結構定義

爲了實現鏈接池及Redis數據庫鏈接,咱們須要以下結構:

  • Redis服務器配置RedisConfig:包含Host、Port等信息
  • Redis鏈接池配置PoolConfig:繼承RedisConfig,包含PoolSize等信息
  • Redis鏈接池結構:包含鏈接隊列、鏈接池配置等信息
  • 單個Redis鏈接:包含TCP鏈接Handler、是否處於空閒標記位、當前使用的數據庫等信息
package redis4go

import (
    "net"
    "sync"
)

type RedisConfig struct {
    Host     string // RedisServer主機地址
    Port     int    // RedisServer主機端口
    Password string // RedisServer須要的Auth驗證,不填則爲空
}

// 鏈接池的配置數據
type PoolConfig struct {
    RedisConfig
    PoolSize int // 鏈接池的大小
}

// 鏈接池結構
type Pool struct {
    Config PoolConfig          // 創建鏈接池時的配置
    Queue  chan *RedisConn     // 鏈接池
    Store  map[*RedisConn]bool // 全部的鏈接
    mu     sync.Mutex          // 加鎖
}

// 單個Redis鏈接的結構
type RedisConn struct {
    mu        sync.Mutex   // 加鎖
    p         *Pool        // 所屬的鏈接池
    IsRelease bool         // 是否處於釋放狀態
    IsClose   bool         // 是否已關閉
    TcpConn   *net.TCPConn // 創建起的到RedisServer的鏈接
    DBIndex   int          // 當前鏈接正在使用第幾個Redis數據庫
}

type RedisResp struct {
    rType byte     // 回覆類型(+-:$*)
    rData [][]byte // 從TCP鏈接中讀取的數據統一使用二維數組返回
}

根據以前的規劃,定義好基本的結構以後,咱們能夠先實現一個簡單的Pool對象池

Redis鏈接

創建鏈接

首先咱們須要實現一個創建Redis鏈接的方法

// 建立一個RedisConn對象
func createRedisConn(config RedisConfig) (*RedisConn, error) {
    tcpAddr := &net.TCPAddr{IP: net.ParseIP(config.Host), Port: config.Port}
    tcpConn, err := net.DialTCP("tcp", nil, tcpAddr)
    if err != nil {
        return nil, err
    }
    return &RedisConn{
        IsRelease: true,
        IsClose:   false,
        TcpConn:   tcpConn,
        DBIndex:   0,
    }, nil
}

實現鏈接池

在Go語言中,咱們可使用一個chan來很輕易地實現一個指定容量的隊列,來做爲鏈接池使用,當池中沒有鏈接時,申請獲取鏈接時將會被阻塞,直到放入新的鏈接。

package redis4go

func CreatePool(config PoolConfig) (*Pool, error) {
    pool := &Pool{
        Config: config,
        Queue:  make(chan *RedisConn, config.PoolSize),
        Store:  make(map[*RedisConn]bool, config.PoolSize),
    }
    for i := 0; i < config.PoolSize; i++ {
        redisConn, err := createRedisConn(config.RedisConfig)
        if err != nil {
            // todo 處理以前已經建立好的連接
            return nil, err
        }
        redisConn.p = pool
        pool.Queue <- redisConn
        pool.Store[redisConn] = true
    }
    return pool, nil
}

// 獲取一個鏈接
func (pool *Pool) getConn() *RedisConn {
    pool.mu.Lock()
    // todo 超時機制
    conn := <-pool.Queue
    conn.IsRelease = false
    pool.mu.Unlock()
    return conn
}

// 關閉鏈接池
func (pool *Pool) Close() {
    for conn := range pool.Store {
        err := conn.Close()
        if err != nil {
            // todo 處理鏈接關閉的錯誤?
        }
    }
}

發送命令&解析回覆數據

下面是向RedisServer發送命令,以及讀取回複數據的簡單實現

func (conn *RedisConn) Call(params ...interface{}) (*RedisResp, error) {
    reqData, err := mergeParams(params...)
    if err != nil {
        return nil, err
    }
    conn.Lock()
    defer conn.Unlock()
    _, err = conn.TcpConn.Write(reqData)
    if err != nil {
        return nil, err
    }
    resp, err := conn.getReply()
    if err != nil {
        return nil, err
    }
    if resp.rType == '-' {
        return resp, resp.ParseError()
    }
    return resp, nil
}

func (conn *RedisConn) getReply() (*RedisResp, error) {
    b := make([]byte, 1)
    _, err := conn.TcpConn.Read(b)
    if err != nil {
        return nil, err
    }
    resp := new(RedisResp)
    resp.rType = b[0]
    switch b[0] {
    case '+':
        // 狀態回覆
        fallthrough
    case '-':
        // 錯誤回覆
        fallthrough
    case ':':
        // 整數回覆
        singleResp := make([]byte, 1)
        for {
            _, err := conn.TcpConn.Read(b)
            if err != nil {
                return nil, err
            }
            if b[0] != '\r' && b[0] != '\n' {
                singleResp = append(singleResp, b[0])
            }
            if b[0] == '\n' {
                break
            }
        }
        resp.rData = append(resp.rData, singleResp)
    case '$':
        buck, err := conn.readBuck()
        if err != nil {
            return nil, err
        }
        resp.rData = append(resp.rData, buck)
    case '*':
        // 條目數量
        itemNum := 0
        for {
            _, err := conn.TcpConn.Read(b)
            if err != nil {
                return nil, err
            }
            if b[0] == '\r' {
                continue
            }
            if b[0] == '\n' {
                break
            }
            itemNum = itemNum*10 + int(b[0]-'0')
        }
        for i := 0; i < itemNum; i++ {
            buck, err := conn.readBuck()
            if err != nil {
                return nil, err
            }
            resp.rData = append(resp.rData, buck)
        }
    default:
        return nil, errors.New("錯誤的服務器回覆")
    }
    return resp, nil
}

func (conn *RedisConn) readBuck() ([]byte, error) {
    b := make([]byte, 1)
    dataLen := 0
    for {
        _, err := conn.TcpConn.Read(b)
        if err != nil {
            return nil, err
        }
        if b[0] == '$' {
            continue
        }
        if b[0] == '\r' {
            break
        }
        dataLen = dataLen*10 + int(b[0]-'0')
    }
    bf := bytes.Buffer{}
    for i := 0; i < dataLen+3; i++ {
        _, err := conn.TcpConn.Read(b)
        if err != nil {
            return nil, err
        }
        bf.Write(b)
    }
    return bf.Bytes()[1 : bf.Len()-2], nil
}

func mergeParams(params ...interface{}) ([]byte, error) {
    count := len(params) // 參數數量
    bf := bytes.Buffer{}
    // 參數數量
    {
        bf.WriteString("*")
        bf.WriteString(strconv.Itoa(count))
        bf.Write([]byte{'\r', '\n'})
    }
    for _, p := range params {
        bf.Write([]byte{'$'})
        switch p.(type) {
        case string:
            str := p.(string)
            bf.WriteString(strconv.Itoa(len(str)))
            bf.Write([]byte{'\r', '\n'})
            bf.WriteString(str)
            break
        case int:
            str := strconv.Itoa(p.(int))
            bf.WriteString(strconv.Itoa(len(str)))
            bf.Write([]byte{'\r', '\n'})
            bf.WriteString(str)
            break
        case nil:
            bf.WriteString("-1")
            break
        default:
            // 不支持的參數類型
            return nil, errors.New("參數只能是String或Int")
        }
        bf.Write([]byte{'\r', '\n'})
    }
    return bf.Bytes(), nil
}

實現幾個經常使用數據類型的解析

package redis4go

import (
    "errors"
    "strconv"
)

func (resp *RedisResp) ParseError() error {
    if resp.rType != '-' {
        return nil
    }
    return errors.New(string(resp.rData[0]))
}

func (resp *RedisResp) ParseInt() (int, error) {
    switch resp.rType {
    case '-':
        return 0, resp.ParseError()
    case '$':
        fallthrough
    case ':':
        str, err := resp.ParseString()
        if err != nil {
            return 0, err
        }
        return strconv.Atoi(str)
    default:
        return 0, errors.New("錯誤的回覆類型")
    }
}

func (resp *RedisResp) ParseString() (string, error) {
    switch resp.rType {
    case '-':
        return "", resp.ParseError()
    case '+':
        fallthrough
    case ':':
        fallthrough
    case '$':
        return string(resp.rData[0]), nil
    default:
        return "", errors.New("錯誤的回覆類型")
    }
}
func (resp *RedisResp) ParseList() ([]string, error) {
    switch resp.rType {
    case '-':
        return nil, resp.ParseError()
    case '*':
        list := make([]string, 0, len(resp.rData))
        for _, data := range resp.rData {
            list = append(list, string(data))
        }
        return list, nil
    default:
        return nil, errors.New("錯誤的回覆類型")
    }
}
func (resp *RedisResp) ParseMap() (map[string]string, error) {
    switch resp.rType {
    case '-':
        return nil, resp.ParseError()
    case '*':
        mp := make(map[string]string)
        for i := 0; i < len(resp.rData); i += 2 {
            mp[string(resp.rData[i])] = string(resp.rData[i+1])
        }
        return mp, nil
    default:
        return nil, errors.New("錯誤的回覆類型")
    }
}

在開發的過程當中,隨手編寫了幾個零零散散的測試文件,經測試,一些簡單的Redis命令以及能跑通了。

package redis4go

import (
    "testing"
)

func getConn() (*RedisConn, error) {
    pool, err := CreatePool(PoolConfig{
        RedisConfig: RedisConfig{
            Host: "127.0.0.1",
            Port: 6379,
        },
        PoolSize: 10,
    })
    if err != nil {
        return nil, err
    }
    conn := pool.getConn()
    return conn, nil
}

func TestRedisResp_ParseString(t *testing.T) {
    demoStr := string([]byte{'A', '\n', '\r', '\n', 'b', '1'})
    conn, _ := getConn()
    _, _ = conn.Call("del", "name")
    _, _ = conn.Call("set", "name", demoStr)
    resp, err := conn.Call("get", "name")
    if err != nil {
        t.Fatal("Call Error:", err.Error())
    }
    str, err := resp.ParseString()
    if err != nil {
        t.Fatal("Parse Error:", err.Error())
    }
    if str != demoStr {
        t.Fatal("結果錯誤")
    }
}

func TestRedisResp_ParseList(t *testing.T) {
    conn, _ := getConn()
    _, _ = conn.Call("del", "testList")
    _, _ = conn.Call("lpush", "testList", 1, 2, 3, 4, 5)
    res, err := conn.Call("lrange", "testList", 0, -1)
    if err != nil {
        t.Fatal("Call Error:", err.Error())
    }
    ls, err := res.ParseList()
    if err != nil {
        t.Fatal("Parse Error:", err.Error())
    }
    if len(ls) != 5 {
        t.Fatal("結果錯誤")
    }
}

func TestRedisResp_ParseMap(t *testing.T) {
    conn, _ := getConn()
    _, _ = conn.Call("del", "testMap")
    _, err := conn.Call("hmset", "testMap", 1, 2, 3, 4, 5, 6)
    if err != nil {
        t.Fatal("設置Value失敗")
    }
    res, err := conn.Call("hgetall", "testMap")
    if err != nil {
        t.Fatal("Call Error:", err.Error())
    }
    ls, err := res.ParseMap()
    if err != nil {
        t.Fatal("Parse Error:", err.Error())
    }
    if len(ls) != 3 || ls["1"] != "2" {
        t.Fatal("結果錯誤")
    }
}

至此,已經算是達到了學習Go語言和學習Redis Protocol的目的,不過代碼中也有不少地方須要優化和完善,性能方面考慮的也並不周全。輪子就不重複造了,畢竟有不少功能完善的庫,從頭造一個輪子須要消耗的精力太多啦而且不必~

下一次我將會學習官方推薦的gomodule/redigo源碼,並分享個人心得。

--The End--

相關文章
相關標籤/搜索