文章持續更新,微信搜一搜「 吳親強的深夜食堂 」
上一篇etcd 實戰基礎篇(一)咱們主要介紹了 etcd 使用場景以及最基礎性的一些操做(put、get、watch)。 這一篇咱們接着實戰etcd其餘業務場景。git
基於 etcd 實現一個分佈式鎖特別簡單。etcd 提供了開箱即用的包 concurrency,幾行代碼就實現一個分佈式鎖。github
package src import ( "context" "flag" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" "strings" "time" ) var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address") // 初始化etcd客戶端 func initEtcdClient() *clientv3.Client { var client *clientv3.Client var err error // 解析etcd的地址,編程[]string endpoints := strings.Split(*addr, ",") // 建立一個 etcd 的客戶端 client, err = clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) if err != nil { fmt.Printf("初始化客戶端失敗:%v\\n", err) log.Fatal(err) } return client } func Lock(id int, lockName string) { client := initEtcdClient() defer client.Close() // 建立一個 session,若是程序宕機奔潰,etcd能夠知道 s, err := concurrency.NewSession(client) if err != nil { log.Fatal(err) } defer s.Close() // 建立一個etcd locker locker := concurrency.NewLocker(s, lockName) log.Printf("id:%v 嘗試獲取鎖%v", id, lockName) locker.Lock() log.Printf("id:%v取得鎖%v", id, lockName) // 模擬業務耗時 time.Sleep(time.Millisecond * 300) locker.Unlock() log.Printf("id:%v釋放鎖%v", id, lockName) }
咱們再寫個腳本運行,看看結果。redis
package main import ( "etcd-test/src" "sync" ) func main() { var lockName = "locker-test" var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(item int) { defer wg.Done() src.Lock(item, lockName) }(i) } wg.Wait() }
咱們發起了10個併發搶同一個 key 鎖的命令。運行結果以下, 編程
從圖片能夠看到,同一時刻必定只有一個 G 獲得鎖,一個 G 獲取到一個鎖的前提必定是當前 key 未被鎖。安全
有人要問了,當一個鎖解開時,以前未獲取到鎖而發生等待的客戶端誰先獲取到這把鎖? 這個問題,咱們後續分析原理的時候再揭曉。微信
說到分佈式鎖,不得不提起 redis。它有一個看似安全實際一點都不安全的分佈式鎖。它的命令模式是,session
set key value [EX seconds] [PX milliseconds] [NX|XX]
這其中,介紹兩個關鍵的屬性:數據結構
咱們在使用 redis 作分佈式鎖的時候會這麼寫。(代碼用了包 https://github.com/go-redis/redis
)併發
func RedisLock(item int) { rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", DB: 0, }) fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String()) res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result() if !res { fmt.Printf("item:%v 嘗試獲取鎖失敗\\n", item) return } fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String()) time.Sleep(1 * time.Second) //模擬業務耗時 fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String()) rdb.Del(ctx, "key") }
rdb.SetNX(ctx, "key", "value", 2*time.Second)
咱們規定鎖的過時時間是2秒,下面有一句 time.Sleep(1 * time.Second)
用來模擬處理業務的耗時。業務處理結束,咱們刪除 key rdb.Del(ctx, "key")
。分佈式
咱們寫個簡單的腳本,
func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(item int) { defer wg.Done() RedisLock(item) }(i) } wg.Wait() }
咱們開啓十個 G 併發的調用 RedisLock
函數。每次調用,函數內部都會新建一個 redis 客戶端,本質上是10個客戶端。
運行這段程序,
從圖中看出,同一時刻只有一個客戶端獲取到鎖,而且在一秒的任務處理後,釋放了鎖,好像沒太大的問題。
那麼,我再寫一個簡單的例子。
import ( "context" "fmt" "github.com/go-redis/redis/v8" "sync" "time" ) var ctx = context.Background() var rdb *redis.Client func main() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() ExampleLock(1, 0) }() go func() { defer wg.Done() ExampleLock(2, 5) }() wg.Wait() } func ExampleLock(item int, timeSleep time.Duration) { rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", DB: 0, }) if timeSleep > 0 { time.Sleep(time.Second * timeSleep) } fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String()) res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result() if !res { fmt.Printf("item:嘗試獲取鎖失敗:%v\\n", item) return } fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String()) time.Sleep(7 * time.Second) fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String()) rdb.Del(ctx, "key") }
咱們設置鎖的過時時間是 3 秒,而獲取鎖以後的任務處理時間爲 7 秒。
而後咱們開啓兩個 G。
ExampleLock(1, 0)ExampleLock(2, 5)
其中第二行數字5,從代碼中能夠看出,是指啓動 G 後過5秒去獲取鎖。
這段代碼總體流程是這樣的:G(1) 獲取到鎖後,設置的鎖持有時間是3秒,因爲任務執行須要7秒的時間,所以在3秒事後鎖會自動釋放。G(2) 能夠在第5秒的時候獲取到鎖,而後它執行任務也得7秒。
最後,G(1)在獲取鎖後7秒執行釋放鎖的操做,G(2)同理。
發現問題了嗎?
G(1) 的鎖在3秒後已經自動釋放了。可是在任務處理結束後又執行了解鎖的操做,可此時這個鎖是 G(2) 的呀。
那麼接下來因爲 G(1) 誤解了 G(2) 的鎖,若是此時有其餘的 G,那麼就能夠獲取到鎖。
等 G(2) 任務執行結束,同理又會誤解其餘 G 的鎖,這是一個惡性循環。 這也是掘金一篇由 redis 分佈式鎖形成茅臺超賣重大事故的緣由之一。
至於其餘的,能夠自行查看這篇文章Redis——由分佈式鎖形成的重大事故。
對隊列更多的理論知識就不加以介紹了。咱們都知道,隊列是一種先進先出的數據結構,通常也只有入隊和出隊兩種操做。 咱們經常在單機的應用中使用到隊列。
那麼,如何實現一個分佈式的隊列呢?。
咱們能夠使用 etcd 開箱即用的工具,在 etcd
底層 recipe
包裏結構 Queue
,實現了一個多讀多寫的分佈式隊列。
type Queue struct { client *v3.Client ctx context.Context keyPrefix string } func NewQueue(client *v3.Client, keyPrefix string) *Queue func (q *Queue) Dequeue() (string, error) func (q *Queue) Enqueue(val string)
咱們基於此包能夠很方便的實現。
package src import ( "github.com/coreos/etcd/clientv3" recipe "github.com/coreos/etcd/contrib/recipes" "log" "strconv" "strings" "sync" "time" ) var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address") // 初始化etcd客戶端 func initEtcdClient() *clientv3.Client { var client *clientv3.Client var err error // 解析etcd的地址,編程[]string endpoints := strings.Split(*addr, ",") // 建立一個 etcd 的客戶端 client, err = clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) if err != nil { log.Printf("初始化客戶端失敗:%v\\n", err) log.Fatal(err) } return client } func Push(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewQueue(client, keyName) var wg sync.WaitGroup for i := 0; i < 3; i++ { for j := 0; j < 10; j++ { wg.Add(1) go func(item int) { defer wg.Done() err := q.Enqueue(strconv.Itoa(item)) if err != nil { log.Printf("push err:%v\\n", err) } }(j) } time.Sleep(2 * time.Second) } wg.Wait() } func Pop(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewQueue(client, keyName) for { res, err := q.Dequeue() if err != nil { log.Fatal(err) return } log.Printf("接收值:%v\\n", res) } }
在 push
中,咱們開啓3輪發送值入隊,每次發送10個,發送一輪休息2秒。 在 pop
中,經過死循環獲取隊列中的值。
運行腳本程序以下。
package main import ( "etcd-test/src" "time" ) func main() { key := "test-queue" go src.Pop(key) time.Sleep(1 * time.Second) go src.Push(key) time.Sleep(20 * time.Second) }
咱們使用兩個 G
表明 分別運行 push
和 pop
操做。 同時爲了達到運行效果,咱們先運行 pop 等待有入隊的元素。 運行結果動畫以下,
etcd
還提供了優先級的分佈式的隊列。和上面的用法類似。只是在入隊的時候,不單單須要提供一個值,還須要提供一個整數,來表示當前 push
值的優先級。數值越小,優先級越高。
咱們改動一下上述的代碼。
package src import ( "github.com/coreos/etcd/clientv3" recipe "github.com/coreos/etcd/contrib/recipes" "log" "strconv" "strings" "sync" "time" ) var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address") // 初始化etcd客戶端 func initEtcdClient() *clientv3.Client { var client *clientv3.Client var err error // 解析etcd的地址,編程[]string endpoints := strings.Split(*addr, ",") // 建立一個 etcd 的客戶端 client, err = clientv3.New(clientv3.Config{Endpoints: endpoints, DialTimeout: 5 * time.Second}) if err != nil { log.Printf("初始化客戶端失敗:%v\\n", err) log.Fatal(err) } return client } func PriorityPush(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewPriorityQueue(client, keyName) var wg sync.WaitGroup for j := 0; j < 10; j++ { wg.Add(1) go func(item int) { defer wg.Done() err := q.Enqueue(strconv.Itoa(item), uint16(item)) if err != nil { log.Printf("push err:%v\\n", err) } }(j) } wg.Wait() } func PriorityPop(keyName string) { client := initEtcdClient() defer client.Close() q := recipe.NewPriorityQueue(client, keyName) for { res, err := q.Dequeue() if err != nil { log.Fatal(err) return } log.Printf("接收值:%v\\n", res) } }
而後如下是咱們的測試代碼:
package main import ( "etcd-test/src" "sync" "time" ) func main() { key := "test-queue" var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() src.PriorityPush(key) }() wg.Wait() go src.PriorityPop(key) time.Sleep(20 * time.Second) }
咱們把0到9的數併發的 push
到隊列中,對應的優先級整數值就是它自己,push
完畢,咱們運行 PriorityPop
函數,看最終結果顯示就是從0到9。
這篇文章主要介紹瞭如何使用 etcd 實現分佈式鎖以及分佈式隊列。其餘etcd的場景,能夠自行實踐。