讀源碼:redigo爲何多線程不安全

redigo是golang的一個操做redis的第三方庫,之因此選擇這個庫,是由於它的文檔十分豐富,操做起來也比較簡單。一個典型的redigo的使用以下所示:git

package main

import (
	"github.com/gomodule/redigo/redis"
	"log"
)

func main() {
	conn, err := redis.Dial("tcp", "192.168.1.2:6379")
	if err != nil {
		log.Fatalf("dial redis failed :%v\n", err)
	}

	result, err := redis.String(conn.Do("SET", "hello", "world"))
	if err != nil {
		log.Fatalln(err)
	}

	log.Println(result)
}
複製代碼

這裏須要注意的一點是,redis 默認是隻能本機訪問的,能夠經過修改 /etc/redis/redis.conf 中的 bind 來實現遠程訪問,這裏我將 bind 改成了服務所在機器的 IP 。程序員

雖然,redigo 的使用十分簡單,可是,在它的文檔中也指出了一點須要咱們特別注意,咱們能夠在 godoc 中看到原文:github

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do and Close methods.golang

翻譯過來就是:redis

鏈接支持同時運行單個執行體調用 Receive 和 單個執行體調用 Send 和 Flush 方法。不支持併發調用 Do 和 Close 方法。編程

本着程序員追根究底的好奇心,我看了一下 redigo 實現 Do 方法的源碼,大體弄清楚了爲何 Do 函數是併發不安全的了。它的部分源碼以下所示:安全

func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
	return c.DoWithTimeout(c.readTimeout, cmd, args...)
}

func (c *conn) DoWithTimeout(readTimeout time.Duration, cmd string, args ...interface{}) (interface{}, error) {
	c.mu.Lock()
	pending := c.pending
	c.pending = 0
	c.mu.Unlock()

	if cmd == "" && pending == 0 {
		return nil, nil
	}

	if c.writeTimeout != 0 {
		c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
	}

	if cmd != "" {
		if err := c.writeCommand(cmd, args); err != nil {
			return nil, c.fatal(err)
		}
	}

	if err := c.bw.Flush(); err != nil {
		return nil, c.fatal(err)
	}

	var deadline time.Time
	if readTimeout != 0 {
		deadline = time.Now().Add(readTimeout)
	}
	c.conn.SetReadDeadline(deadline)

	if cmd == "" {
		reply := make([]interface{}, pending)
		for i := range reply {
			r, e := c.readReply()
			if e != nil {
				return nil, c.fatal(e)
			}
			reply[i] = r
		}
		return reply, nil
	}

	var err error
	var reply interface{}
	for i := 0; i <= pending; i++ {
		var e error
		if reply, e = c.readReply(); e != nil {
			return nil, c.fatal(e)
		}
		if e, ok := reply.(Error); ok && err == nil {
			err = e
		}
	}
	return reply, err
}

func (c *conn) writeCommand(cmd string, args []interface{}) error {
	c.writeLen('*', 1+len(args))
	if err := c.writeString(cmd); err != nil {
		return err
	}
	for _, arg := range args {
		if err := c.writeArg(arg, true); err != nil {
			return err
		}
	}
	return nil
}
複製代碼

上面三個函數實如今 redigo 的 redis 包的 conn.go 文件中,在 DoWithTimeout 方法中,咱們能夠看到它是順序執行數據的發送和相應的接收的,並且,函數中仍是沒有加鎖的。雖然,golang 的 TCP 發送底層實現是有加鎖的,能夠保證一次寫操做的數據中,不會有另外一次寫操做的數據插入,可是,在這個 DoWithTimeout 的實現中,咱們仍是能隱約聞到一種不安全的味道。併發

咱們把焦點鎖定在 writeCommand 這個方法上。從它的實現,咱們能夠了解到,它的做用主要是在 for ... range 中將 redis 的命令發送到 redis-server 執行。這時,咱們可能會注意到,這個函數是沒有加鎖的,若是 for ... range 是往一個全局的緩衝去中寫數據,那麼,併發時頗有可能會致使數據的交叉。爲了證明這個假設,咱們繼續看 writeArg 的實現:tcp

func (c *conn) writeArg(arg interface{}, argumentTypeOK bool) (err error) {
	switch arg := arg.(type) {
	case string:
		return c.writeString(arg)
	case []byte:
		return c.writeBytes(arg)
	case int:
		return c.writeInt64(int64(arg))
	case int64:
		return c.writeInt64(arg)
	case float64:
		return c.writeFloat64(arg)
	case bool:
		if arg {
			return c.writeString("1")
		} else {
			return c.writeString("0")
		}
	case nil:
		return c.writeString("")
	case Argument:
		if argumentTypeOK {
			return c.writeArg(arg.RedisArg(), false)
		}
		// See comment in default clause below.
		var buf bytes.Buffer
		fmt.Fprint(&buf, arg)
		return c.writeBytes(buf.Bytes())
	default:
		// This default clause is intended to handle builtin numeric types.
		// The function should return an error for other types, but this is not
		// done for compatibility with previous versions of the package.
		var buf bytes.Buffer
		fmt.Fprint(&buf, arg)
		return c.writeBytes(buf.Bytes())
	}
}

func (c *conn) writeString(s string) error {
	c.writeLen('$', len(s))
	c.bw.WriteString(s)
	_, err := c.bw.WriteString("\r\n")
	return err
}
複製代碼

writeArg 方法是經過判斷傳入參數的不一樣來調用不一樣的方法來寫數據的,不過這幾個方法的底層其實都是調用了 writeString 這個方法。在 writeString 這個方法的實現中,咱們看到 redigo 是把數據都寫到 bw 的。bw 是 conn 一個 net.Conn 的 writter,也就是說,若是併發執行 Do 方法的話,這幾個併發的執行體都是往同一個 net.Conn的 writter 中寫數據的,這基本證明了我上面的假設。函數

咱們回過來看 DoWithTimeout 函數執行了 writeCommand 以後,調用的 bw 的 Flush 方法,這個方法將緩衝區中的數據都發送出去,咱們看一下它的實現:

// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
	if b.err != nil {
		return b.err
	}
	if b.n == 0 {
		return nil
	}
	n, err := b.wr.Write(b.buf[0:b.n])
	if n < b.n && err == nil {
		err = io.ErrShortWrite
	}
	if err != nil {
		if n > 0 && n < b.n {
			copy(b.buf[0:b.n-n], b.buf[n:b.n])
		}
		b.n -= n
		b.err = err
		return err
	}
	b.n = 0
	return nil
}
複製代碼

從代碼中,咱們能夠看到,在調用了 b.wr.Write 方法後,有一個判斷已寫的數據長度是否和緩衝區的數據長度相等的操做。從上面的分析咱們能夠知道,redigo 在調用 Do 的整個過程當中都是沒有加鎖的,那麼,在併發時,一個執行體的 Flush 過程當中,頗有可能會有別的執行體往 writer 的緩衝區中寫數據,出如今調用完 b.wr.Write 以後對已寫數據長度小於緩衝區數據長度的現象,從而致使 short write 的錯誤。

咱們能夠寫一個程序測試一下:

package main

import (
	"github.com/gomodule/redigo/redis"
	"log"
	"sync"
)

func main() {
	conn, err := redis.Dial("tcp", "192.168.1.2:6379")
	if err != nil {
		log.Fatalf("dial redis failed :%v\n", err)
	}

	wg := sync.WaitGroup{}
	wg.Add(2)

	go func() {
		defer wg.Done()
		result, err := redis.String(conn.Do("SET", "hello", "world"))
		if err != nil {
			log.Fatalln(err)
		}
		log.Println(result)
	}()

	go func() {
		defer wg.Done()
		result, err := redis.String(conn.Do("SET", "hello", "world"))
		if err != nil {
			log.Fatalln(err)
		}
		log.Println(result)
	}()

	wg.Wait()
}
複製代碼

執行以後,果真出現了 short write 的錯誤:

redigo 的做者推薦咱們在併發時使用鏈接池來保證安全,redigo 的鏈接池的實現將會在下次一塊兒閱讀。

讀源碼能夠了解到開源做者實現開源做品的思路,還能夠開拓視野,認識到一些更好的編程技巧,這個習慣但是要好好堅持啊。

相關文章
相關標籤/搜索