Go Redigo 源碼分析(三) 執行命令



func main() {
    // 1. 建立鏈接池
    // 2. 簡單設置鏈接池的最大連接數等參數
    // 3. 注入撥號函數
    // 4. 調用pool.Get() 獲取鏈接
    pool := &redis.Pool{
        MaxIdle:   4,
        MaxActive: 4,
        Dial: func() (redis.Conn, error) {
            rc, err := redis.Dial("tcp", "")
            if err != nil {
                return nil, err
            return rc, nil
        IdleTimeout: time.Second,
        Wait:        true,
    con := pool.Get()
    // 獲取單條
    str, err := redis.String(con.Do("get", "aaa"))
    fmt.Println(str, err)
    // 通道 發送多條接受多條
    con.Send("get", "aaa")
    con.Send("get", "bbb")
    con.Send("get", "ccc")
    str, err = redis.String(con.Receive())
    fmt.Println("value: ", str, " err:", err)
    str, err = redis.String(con.Receive())
    fmt.Println("value: ", str, " err:", err)
    str, err = redis.String(con.Receive())
    fmt.Println("value: ", str, " err:", err)


1. Conn接口 在rediso中有兩個對象都實現了這個接口github

type Conn interface {
    // Close closes the connection.
    Close() error

    // Err returns a non-nil value when the connection is not usable.
    Err() error

    // Do sends a command to the server and returns the received reply.
    Do(commandName string, args ...interface{}) (reply interface{}, err error)

    // Send writes the command to the client's output buffer.
    Send(commandName string, args ...interface{}) error

    // Flush flushes the output buffer to the Redis server.
    Flush() error

    // Receive receives a single reply from the Redis server
    Receive() (reply interface{}, err error)

// 鏈接池對外的鏈接對象
type activeConn struct {
    p     *Pool
    pc    *poolConn
    state int

// 鏈接對象 
type conn struct {
    //  鎖
    mu      sync.Mutex
    pending int
    err     error
    // http 包中的conn對象
    conn    net.Conn

    // 讀入過時時間
    readTimeout time.Duration
    // bufio reader對象 用於讀取redis服務返回的結果
    br          *bufio.Reader

    // 寫入過時時間
    writeTimeout time.Duration
    // bufio writer對象 帶buf 用於往服務端寫命令
    bw           *bufio.Writer

    // Scratch space for formatting argument length.
    // '*' or '$', length, "\r\n"
    lenScratch [32]byte

    // Scratch space for formatting integers and floats.
    numScratch [40]byte

Do 函數
DoWithTimeout函數負責將請求的命令發送到redis服務 再從redis服務讀取回復

// active conn Do函數 設定請求狀態用於關閉時候退出命令
func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
    pc := ac.pc
    if pc == nil {
        return nil, errConnClosed
    // 查看是否須要改變狀態
    ci := lookupCommandInfo(commandName)
    ac.state = (ac.state | ci.Set) &^ ci.Clear
    return pc.c.Do(commandName, args...)

func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
    return c.DoWithTimeout(c.readTimeout, cmd, args...)

// conn 執行命令函數
func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
    pending := c.pending
    c.pending = 0

    if cmd == "" && pending == 0 {
        return nil, nil
    // 設置下入超時時間
    if c.writeTimeout != 0 {

    // 若是cmd不爲空則寫入redis命令
    if cmd != "" {
        // 寫入命令道buf中
        if err := c.writeCommand(cmd, args); err != nil {
            return nil, c.fatal(err)
    // 把寫入的buf 的command 寫入conn中 正式發送到服務器
    if err := c.bw.Flush(); err != nil {
        return nil, c.fatal(err)

    var deadline time.Time
    if readTimeout != 0 {
        deadline = time.Now().Add(readTimeout)

    if cmd == "" {
        reply := make([]interface{}, pending)
        for i := range reply {
            r, e := c.readReply()
            if e != nil {
                return nil, c.fatal(e)
            reply[i] = r
        return reply, nil

    var err error
    var reply interface{}
    for i := 0; i <= pending; i++ {
        var e error
        if reply, e = c.readReply(); e != nil {
            return nil, c.fatal(e)
        if e, ok := reply.(Error); ok && err == nil {
            err = e
    return reply, err

// 把command寫入到conn的write中
// 1. 先寫入*號
// 2. 再寫入command
// 3. 最後寫入參數
func (c *conn) writeCommand(cmd string, args []interface{}) error {
    c.writeLen('*', 1+len(args))
    if err := c.writeString(cmd); err != nil {
        return err
    for _, arg := range args {
        if err := c.writeArg(arg, true); err != nil {
            return err
    return nil

// 讀取redis回覆 經過判斷回覆雷星星 + - : $來解析
func (c *conn) readReply() (interface{}, error) {
    line, err := c.readLine()
    if err != nil {
        return nil, err
    if len(line) == 0 {
        return nil, protocolError("short response line")
    switch line[0] {
    // 回覆狀態
    case '+':
        switch {
        case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
            // Avoid allocation for frequent "+OK" response.
            return okReply, nil
        case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
            // Avoid allocation in PING command benchmarks :)
            return pongReply, nil
            return string(line[1:]), nil
    // 錯誤回覆
    case '-':
        return Error(string(line[1:])), nil
    // 整數回覆
    case ':':
        return parseInt(line[1:])
    // 批量回復
    case '$':
        n, err := parseLen(line[1:])
        if n < 0 || err != nil {
            return nil, err
        p := make([]byte, n)
        _, err = io.ReadFull(c.br, p)
        if err != nil {
            return nil, err
        if line, err := c.readLine(); err != nil {
            return nil, err
        } else if len(line) != 0 {
            return nil, protocolError("bad bulk string format")
        return p, nil
        // 多條批量回復
    case '*':
        n, err := parseLen(line[1:])
        if n < 0 || err != nil {
            return nil, err
        r := make([]interface{}, n)
        for i := range r {
            r[i], err = c.readReply()
            if err != nil {
                return nil, err
        return r, nil
    return nil, protocolError("unexpected response line")


