Etcd 實戰練習(二)

文章持續更新,微信搜一搜「  吳親強的深夜食堂 」

上一篇etcd 實戰基礎篇(一)咱們主要介紹了 etcd 使用場景以及最基礎性的一些操做(put、get、watch)。 這一篇咱們接着實戰etcd其餘業務場景。git

基於 etcd 的分佈式鎖

基於 etcd 實現一個分佈式鎖特別簡單。etcd 提供了開箱即用的包 concurrency,幾行代碼就實現一個分佈式鎖。github

package src

import (
  "context"
  "flag"
  "fmt"
  "github.com/coreos/etcd/clientv3"
  "github.com/coreos/etcd/clientv3/concurrency"
  "log"
  "strings"
  "time"
)

var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,編程[]string
  endpoints := strings.Split(*addr, ",")
  // 建立一個 etcd 的客戶端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    fmt.Printf("初始化客戶端失敗:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func Lock(id int, lockName string) {
  client := initEtcdClient()
  defer client.Close()

  // 建立一個 session,若是程序宕機奔潰,etcd能夠知道
  s, err := concurrency.NewSession(client)
  if err != nil {
    log.Fatal(err)
  }
  defer s.Close()

  // 建立一個etcd locker
  locker := concurrency.NewLocker(s, lockName)

  log.Printf("id:%v 嘗試獲取鎖%v", id, lockName)
  locker.Lock()
  log.Printf("id:%v取得鎖%v", id, lockName)

  // 模擬業務耗時
  time.Sleep(time.Millisecond * 300)

  locker.Unlock()
  log.Printf("id:%v釋放鎖%v", id, lockName)
}

咱們再寫個腳本運行,看看結果。redis

package main

import (
  "etcd-test/src"
  "sync"
)

func main() {
  var lockName = "locker-test"
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      src.Lock(item, lockName)
    }(i)
  }
  wg.Wait()
}

咱們發起了10個併發搶同一個 key 鎖的命令。運行結果以下, 編程

圖片

從圖片能夠看到,同一時刻必定只有一個 G 獲得鎖,一個 G 獲取到一個鎖的前提必定是當前 key 未被鎖。安全

有人要問了,當一個鎖解開時,以前未獲取到鎖而發生等待的客戶端誰先獲取到這把鎖? 這個問題,咱們後續分析原理的時候再揭曉。微信

說到分佈式鎖,不得不提起 redis。它有一個看似安全實際一點都不安全的分佈式鎖。它的命令模式是,session

set key value [EX seconds] [PX milliseconds] [NX|XX]

這其中,介紹兩個關鍵的屬性:數據結構

  • EX 標示設置過時時間,單位是秒。
  • NX 表示 當對應的 key 不存在時,才建立。

咱們在使用 redis 作分佈式鎖的時候會這麼寫。(代碼用了包 https://github.com/go-redis/redis)併發

func RedisLock(item int) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result()
  if !res {
    fmt.Printf("item:%v 嘗試獲取鎖失敗\\n", item)
    return
  }

  fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String())
  time.Sleep(1 * time.Second) //模擬業務耗時
  fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}
rdb.SetNX(ctx, "key", "value", 2*time.Second)

咱們規定鎖的過時時間是2秒,下面有一句 time.Sleep(1 * time.Second) 用來模擬處理業務的耗時。業務處理結束,咱們刪除 key rdb.Del(ctx, "key") 。分佈式

咱們寫個簡單的腳本,

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      RedisLock(item)
    }(i)
  }
  wg.Wait()
}

咱們開啓十個 G 併發的調用 RedisLock 函數。每次調用,函數內部都會新建一個 redis 客戶端,本質上是10個客戶端。

運行這段程序,

圖片

從圖中看出,同一時刻只有一個客戶端獲取到鎖,而且在一秒的任務處理後,釋放了鎖,好像沒太大的問題。

那麼,我再寫一個簡單的例子。

import (
  "context"
  "fmt"
  "github.com/go-redis/redis/v8"
  "sync"
  "time"
)

var ctx = context.Background()
var rdb *redis.Client

func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go func() {
    defer wg.Done()
    ExampleLock(1, 0)
  }()

  go func() {
    defer wg.Done()
    ExampleLock(2, 5)
  }()
  wg.Wait()
}


func ExampleLock(item int, timeSleep time.Duration) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  if timeSleep > 0 {
    time.Sleep(time.Second * timeSleep)
  }
  fmt.Printf("item:%v 嘗試獲取鎖,時間:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result()
  if !res {
    fmt.Printf("item:嘗試獲取鎖失敗:%v\\n", item)
    return
  }

  fmt.Printf("item:%v 獲取到鎖,時間:%v\\n", item, time.Now().String())
  time.Sleep(7 * time.Second)
  fmt.Printf("item:%v 釋放鎖,時間:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}

咱們設置鎖的過時時間是 3 秒,而獲取鎖以後的任務處理時間爲 7 秒。

而後咱們開啓兩個 G。

ExampleLock(1, 0)ExampleLock(2, 5)

其中第二行數字5,從代碼中能夠看出,是指啓動 G 後過5秒去獲取鎖。

這段代碼總體流程是這樣的:G(1) 獲取到鎖後,設置的鎖持有時間是3秒,因爲任務執行須要7秒的時間,所以在3秒事後鎖會自動釋放。G(2) 能夠在第5秒的時候獲取到鎖,而後它執行任務也得7秒。

最後,G(1)在獲取鎖後7秒執行釋放鎖的操做,G(2)同理。

圖片

發現問題了嗎?

G(1) 的鎖在3秒後已經自動釋放了。可是在任務處理結束後又執行了解鎖的操做,可此時這個鎖是 G(2) 的呀。

那麼接下來因爲 G(1) 誤解了 G(2) 的鎖,若是此時有其餘的 G,那麼就能夠獲取到鎖。

等 G(2) 任務執行結束,同理又會誤解其餘 G 的鎖,這是一個惡性循環。 這也是掘金一篇由 redis 分佈式鎖形成茅臺超賣重大事故的緣由之一。

至於其餘的,能夠自行查看這篇文章Redis——由分佈式鎖形成的重大事故

基於 etcd 的分佈式隊列

對隊列更多的理論知識就不加以介紹了。咱們都知道,隊列是一種先進先出的數據結構,通常也只有入隊和出隊兩種操做。 咱們經常在單機的應用中使用到隊列。

那麼,如何實現一個分佈式的隊列呢?。

咱們能夠使用 etcd 開箱即用的工具,在 etcd 底層 recipe 包裏結構 Queue,實現了一個多讀多寫的分佈式隊列。

type Queue struct {
  client *v3.Client
  ctx context.Context

  keyPrefix string
}
func NewQueue(client *v3.Client, keyPrefix string) *Queue
func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string)

咱們基於此包能夠很方便的實現。

package src

import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,編程[]string
  endpoints := strings.Split(*addr, ",")
  // 建立一個 etcd 的客戶端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    log.Printf("初始化客戶端失敗:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func Push(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  var wg sync.WaitGroup

  for i := 0; i < 3; i++ {
    for j := 0; j < 10; j++ {
      wg.Add(1)
      go func(item int) {
        defer wg.Done()
        err := q.Enqueue(strconv.Itoa(item))
        if err != nil {
          log.Printf("push err:%v\\n", err)
        }
      }(j)
    }
    time.Sleep(2 * time.Second)
  }
  wg.Wait()
}

func Pop(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  for {
    res, err := q.Dequeue()
    if err != nil {
      log.Fatal(err)
      return
    }
    log.Printf("接收值:%v\\n", res)
  }
}

在 push 中,咱們開啓3輪發送值入隊,每次發送10個,發送一輪休息2秒。 在 pop 中,經過死循環獲取隊列中的值。

運行腳本程序以下。

package main

import (
  "etcd-test/src"
  "time"
)

func main() {
  key := "test-queue"
  go src.Pop(key)
  time.Sleep(1 * time.Second)
  go src.Push(key)
  time.Sleep(20 * time.Second)
}

咱們使用兩個 G 表明 分別運行 push 和 pop 操做。 同時爲了達到運行效果,咱們先運行 pop 等待有入隊的元素。 運行結果動畫以下,

圖片

etcd 還提供了優先級的分佈式的隊列。和上面的用法類似。只是在入隊的時候,不單單須要提供一個值,還須要提供一個整數,來表示當前 push 值的優先級。數值越小,優先級越高。

咱們改動一下上述的代碼。

package src

import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")

// 初始化etcd客戶端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,編程[]string
  endpoints := strings.Split(*addr, ",")
  // 建立一個 etcd 的客戶端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    log.Printf("初始化客戶端失敗:%v\\n", err)
    log.Fatal(err)
  }
  return client
}

func PriorityPush(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  var wg sync.WaitGroup

  for j := 0; j < 10; j++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      err := q.Enqueue(strconv.Itoa(item), uint16(item))
      if err != nil {
        log.Printf("push err:%v\\n", err)
      }
    }(j)
  }
  wg.Wait()
}

func PriorityPop(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  for {
    res, err := q.Dequeue()
    if err != nil {
      log.Fatal(err)
      return
    }
    log.Printf("接收值:%v\\n", res)
  }
}

而後如下是咱們的測試代碼:

package main

import (
  "etcd-test/src"
  "sync"
  "time"
)
func main() {
  key := "test-queue"
  var wg sync.WaitGroup
  wg.Add(1)
  go func() {
    defer wg.Done()
    src.PriorityPush(key)
  }()
  wg.Wait()
  go src.PriorityPop(key)
  time.Sleep(20 * time.Second)
}

咱們把0到9的數併發的 push 到隊列中,對應的優先級整數值就是它自己,push 完畢,咱們運行 PriorityPop 函數,看最終結果顯示就是從0到9。

圖片

總結

這篇文章主要介紹瞭如何使用 etcd 實現分佈式鎖以及分佈式隊列。其餘etcd的場景,能夠自行實踐。

相關文章
相關標籤/搜索