基於 etcd 實現分佈式鎖

概述

在傳統單體應用單機部署的狀況下,可使用Java併發處理相關的API(如ReentrantLock或Synchronized)進行互斥控制。在單機環境中,Java中提供了不少併發處理相關的API。可是,隨着業務發展的須要,原單體單機部署的系統被演化成分佈式集羣系統後,因爲分佈式系統多線程、多進程而且分佈在不一樣機器上,這將使原單機部署狀況下的併發控制鎖策略失效,單純的Java API並不能提供分佈式鎖的能力。爲了解決這個問題就須要一種跨JVM的互斥機制來控制共享資源的訪問,這就是分佈式鎖要解決的問題!php

鎖是在執行多線程時用於強行限制資源訪問的同步機制,在單機系統上,可使用Java併發處理相關的API(如ReentrantLock或Synchronized)進行互斥控制。而在分佈式系統場景下,實例會運行在多臺機器上,爲了使多進程(多實例上)對共享資源的讀寫同步,保證數據的最終一致性,引入了分佈式鎖。html

分佈式鎖應具有如下特色:java

  • 互斥性:在任意時刻,只有一個客戶端(進程)能持有鎖
  • 安全性:避免死鎖狀況,當一個客戶端在持有鎖期間內,因爲意外崩潰而致使鎖未能主動解鎖,其持有的鎖也可以被正確釋放,並保證後續其它客戶端也能加鎖
  • 可用性:分佈式鎖須要有必定的高可用能力,當提供鎖的服務節點故障(宕機)時不影響服務運行,避免單點風險,如Redis的集羣模式、哨兵模式,ETCD/zookeeper的集羣選主能力等保證HA,保證自身持有的數據與故障節點一致。
  • 對稱性:對同一個鎖,加鎖和解鎖必須是同一個進程,即不能把其餘進程持有的鎖給釋放了,這又稱爲鎖的可重入性。

分佈式鎖常見實現方式:git

  1. 經過數據庫方式實現:採用樂觀鎖、悲觀鎖或者基於主鍵惟一約束實現
  2. 基於分佈式緩存實現的鎖服務: Redis 和基於 Redis 的 RedLock(Redisson提供了參考實現)
  3. 基於分佈式一致性算法實現的鎖服務:ZooKeeper、Chubby(google閉源實現)和 Etcd

網上常見的是基於Redis和ZooKeeper的實現,基於數據庫的由於實現繁瑣且性能較差,不想維護第三方中間件的能夠考慮。本文主要描述基於 ETCD 的實現,etcd3 的client也給出了新的 api,使用上更爲簡單github

基於 Redis 的實現

既然是鎖,核心操做無外乎加鎖、解鎖。golang

Redis的加鎖操做:算法

SET lock_name my_random_value NX PX 30000
  • lock_name,鎖的名稱,對於 Redis 而言,lock_name 就是 Key-Value 中的 Key,具備惟一性。
  • random_value,由客戶端生成的一個隨機字符串,它要保證在足夠長的一段時間內,且在全部客戶端的全部獲取鎖的請求中都是惟一的,用於惟一標識鎖的持有者。
  • NX 只有當 lock_name(key) 不存在的時候才能 SET 成功,從而保證只有一個客戶端能得到鎖,而其它客戶端在鎖被釋放以前都沒法得到鎖。
  • PX 30000 表示這個鎖節點有一個 30 秒的自動過時時間(目的是爲了防止持有鎖的客戶端故障後,沒法主動釋放鎖而致使死鎖,所以要求鎖的持有者必須在過時時間以內執行完相關操做並釋放鎖)。

Redis的解鎖操做:數據庫

del lock_name
  • 在加鎖時爲鎖設置過時時間,當過時時間到達,Redis 會自動刪除對應的 Key-Value,從而避免死鎖。注意,這個過時時間須要結合具體業務綜合評估設置,以保證鎖的持有者可以在過時時間以內執行完相關操做並釋放鎖。
  • 正常執行完畢,未到達鎖過時時間,經過del lock_name主動釋放鎖。

基於 ETCD的分佈式鎖

機制

etcd 支持如下功能,正是依賴這些功能來實現分佈式鎖的:api

  • Lease 機制:即租約機制(TTL,Time To Live),Etcd 能夠爲存儲的 KV 對設置租約,當租約到期,KV 將失效刪除;同時也支持續約,即 KeepAlive。
  • Revision 機制:每一個 key 帶有一個 Revision 屬性值,etcd 每進行一次事務對應的全局 Revision 值都會加一,所以每一個 key 對應的 Revision 屬性值都是全局惟一的。經過比較 Revision 的大小就能夠知道進行寫操做的順序。
  • 在實現分佈式鎖時,多個程序同時搶鎖,根據 Revision 值大小依次得到鎖,能夠避免 「羊羣效應」 (也稱 「驚羣效應」),實現公平鎖。
  • Prefix 機制:即前綴機制,也稱目錄機制。能夠根據前綴(目錄)獲取該目錄下全部的 key 及對應的屬性(包括 key, value 以及 revision 等)。
  • Watch 機制:即監聽機制,Watch 機制支持 Watch 某個固定的 key,也支持 Watch 一個目錄(前綴機制),當被 Watch 的 key 或目錄發生變化,客戶端將收到通知。

過程

實現過程:緩存

  • 步驟 1: 準備

客戶端鏈接 Etcd,以 /lock/mylock 爲前綴建立全局惟一的 key,假設第一個客戶端對應的 key="/lock/mylock/UUID1",第二個爲 key="/lock/mylock/UUID2";客戶端分別爲本身的 key 建立租約 - Lease,租約的長度根據業務耗時肯定,假設爲 15s;

  • 步驟 2: 建立定時任務做爲租約的「心跳」

當一個客戶端持有鎖期間,其它客戶端只能等待,爲了不等待期間租約失效,客戶端需建立一個定時任務做爲「心跳」進行續約。此外,若是持有鎖期間客戶端崩潰,心跳中止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。

  • 步驟 3: 客戶端將本身全局惟一的 key 寫入 Etcd

進行 put 操做,將步驟 1 中建立的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,假設兩個客戶端 put 操做返回的 Revision 分別爲 一、2,客戶端需記錄 Revision 用以接下來判斷本身是否得到鎖。

  • 步驟 4: 客戶端判斷是否得到鎖

客戶端之前綴 /lock/mylock 讀取 keyValue 列表(keyValue 中帶有 key 對應的 Revision),判斷本身 key 的 Revision 是否爲當前列表中最小的,若是是則認爲得到鎖;不然監聽列表中前一個 Revision 比本身小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則本身得到鎖。

  • 步驟 5: 執行業務

得到鎖後,操做共享資源,執行業務代碼。

  • 步驟 6: 釋放鎖

完成業務流程後,刪除對應的key釋放鎖。

實現

自帶的 etcdctl 能夠模擬鎖的使用:

// 第一個終端
$ ./etcdctl lock mutex1
mutex1/326963a02758b52d
​
// 第二終端
$ ./etcdctl lock mutex1
​
// 當第一個終端結束了,第二個終端會顯示
mutex1/326963a02758b531

在etcd的clientv3包中,實現了分佈式鎖。使用起來和mutex是相似的,爲了瞭解其中的工做機制,這裏簡要的作一下總結。

etcd分佈式鎖的實如今go.etcd.io/etcd/clientv3/concurrency包中,主要提供瞭如下幾個方法:

* func NewMutex(s *Session, pfx string) *Mutex, 用來新建一個mutex
* func (m *Mutex) Lock(ctx context.Context) error,它會阻塞直到拿到了鎖,而且支持經過context來取消獲取鎖。
* func (m *Mutex) Unlock(ctx context.Context) error,解鎖

所以在使用etcd提供的分佈式鎖式很是簡單,一般就是實例化一個mutex,而後嘗試搶佔鎖,以後進行業務處理,最後解鎖便可。

demo:

package main

import (  
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "os"
    "os/signal"
    "time"
)

func main() {  
    c := make(chan os.Signal)
    signal.Notify(c)

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    lockKey := "/lock"

    go func () {
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go1 get mutex failed " + err.Error())
        }
        fmt.Printf("go1 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(10) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go1 release lock\n")
    }()

    go func() {
        time.Sleep(time.Duration(2) * time.Second)
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go2 get mutex failed " + err.Error())
        }
        fmt.Printf("go2 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(2) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go2 release lock\n")
    }()

    <-c
}

原理

Lock()函數的實現很簡單:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先經過一個事務來嘗試加鎖,這個事務主要包含了4個操做: cmp、put、get、getOwner。須要注意的是,key是由pfx和Lease()組成的。

  • cmp: 比較加鎖的key的修訂版本是不是0。若是是0就表明這個鎖不存在。
  • put: 向加鎖的key中存儲一個空值,這個操做就是一個加鎖的操做,可是這把鎖是有超時時間的,超時的時間是session的默認時長。超時是爲了防止鎖沒有被正常釋放致使死鎖。
  • get: get就是經過key來查詢
  • getOwner: 注意這裏是用m.pfx來查詢的,而且帶了查詢參數WithFirstCreate()。使用pfx來查詢是由於其餘的session也會用一樣的pfx來嘗試加鎖,而且由於每一個LeaseID都不一樣,因此第一次確定會put成功。可是隻有最先使用這個pfx的session纔是持有鎖的,因此這個getOwner的含義就是這樣的。

接下來纔是經過判斷來檢查是否持有鎖

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是當前的版本號,resp.Succeeded是cmp爲true時值爲true,不然是false。這裏的判斷代表當同一個session非第一次嘗試加鎖,當前的版本號應該取這個key的最新的版本號。

下面是取得鎖的持有者的key。若是當前沒有人持有這把鎖,那麼默認當前會話得到了鎖。或者鎖持有者的版本號和當前的版本號一致, 那麼當前的會話就是鎖的持有者。

// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
    m.Unlock(client.Ctx())
} else {
    m.hdr = hdr
}

上面這段代碼就很好理解了,由於走到這裏說明沒有獲取到鎖,那麼這裏等待鎖的刪除。

waitDeletes方法的實現也很簡單,可是須要注意的是,這裏的getOpts只會獲取比當前會話版本號更低的key,而後去監控最新的key的刪除。等這個key刪除了,本身也就拿到鎖了。

這種分佈式鎖的實現和我一開始的預想是不一樣的。它不存在鎖的競爭,不存在重複的嘗試加鎖的操做。而是經過使用統一的前綴pfx來put,而後根據各自的版本號來排隊獲取鎖。效率很是的高。避免了驚羣效應


如圖所示,共有4個session來加鎖,那麼根據revision來排隊,獲取鎖的順序爲session2 -> session3 -> session1 -> session4。

這裏面須要注意一個驚羣效應,每個client在鎖住/lock這個path的時候,實際都已經插入了本身的數據,相似/lock/LEASE_ID,而且返回了各自的index(就是raft算法裏面的日誌索引),而只有最小的纔算是拿到了鎖,其餘的client須要watch等待。例如client1拿到了鎖,client2和client3在等待,而client2拿到的index比client3的更小,那麼對於client1刪除鎖以後,client3其實並不關心,並不須要去watch。因此綜上,等待的節點只須要watch比本身index小而且差距最小的節點刪除事件便可。

基於 ETCD的選主

機制

etcd有多種使用場景,Master選舉是其中一種。提及Master選舉,過去經常使用zookeeper,經過建立EPHEMERAL_SEQUENTIAL節點(臨時有序節點),咱們選擇序號最小的節點做爲Master,邏輯直觀,實現簡單是其優點,可是要實現一個高健壯性的選舉並不簡單,同時zookeeper繁雜的擴縮容機制也是沉重的負擔。

master 選舉根本上也是搶鎖,與zookeeper直觀選舉邏輯相比,etcd的選舉則須要在咱們熟悉它的一系列基本概念後,調動咱們充分的想象力:

  • 一、MVCC,key存在版本屬性,沒被建立時版本號爲0;
  • 二、CAS操做,結合MVCC,能夠實現競選邏輯,if(version == 0) set(key,value),經過原子操做,確保只有一臺機器能set成功;
  • 三、Lease租約,能夠對key綁定一個租約,租約到期時沒預定,這個key就會被回收;
  • 四、Watch監聽,監聽key的變化事件,若是key被刪除,則從新發起競選。

    至此,etcd選舉的邏輯大致清晰了,但這一系列操做與zookeeper相比複雜不少,有沒有已經封裝好的庫能夠直接拿來用?etcd clientv3 concurrency中有對選舉及分佈式鎖的封裝。後面進一步發現,etcdctl v3裏已經有master選舉的實現了,下面針對這部分代碼進行簡單註釋,在最後參考這部分代碼實現本身的選舉邏輯。

實現

官方示例:https://github.com/etcd-io/et...

如crontab 示例:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/clientv3/concurrency"
    "log"
    "time"
)

const prefix = "/election-demo"
const prop = "local"

func main() {
    endpoints := []string{"szth-cce-devops00.szth.baidu.com:8379"}
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    campaign(cli, prefix, prop)

}

func campaign(c *clientv3.Client, election string, prop string) {
    for {
        // 租約到期時間:5s
        s, err := concurrency.NewSession(c, concurrency.WithTTL(5))
        if err != nil {
            fmt.Println(err)
            continue
        }
        e := concurrency.NewElection(s, election)
        ctx := context.TODO()

        log.Println("開始競選")

        err = e.Campaign(ctx, prop)
        if err != nil {
            log.Println("競選 leader失敗,繼續")
            switch {
            case err == context.Canceled:
                return
            default:
                continue
            }
        }

        log.Println("得到leader")
        if err := doCrontab(); err != nil {
            log.Println("調用主方法失敗,辭去leader,從新競選")
            _ = e.Resign(ctx)
            continue
        }
        return
    }
}

func doCrontab() error {
    for {
        fmt.Println("doCrontab")
        time.Sleep(time.Second * 4)
        //return fmt.Errorf("sss")
    }
}

原理

/*
 * 發起競選
 * 未當選leader前,會一直阻塞在Campaign調用
 * 當選leader後,等待SIGINT、SIGTERM或session過時而退出
 * https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go
 */
 
func campaign(c *clientv3.Client, election string, prop string) error {
        //NewSession函數中建立了一個lease,默認是60s TTL,並會調用KeepAlive,永久爲這個lease自動續約(2/3生命週期的時候執行續約操做)
    s, err := concurrency.NewSession(c)
    if err != nil {
        return err
    }
    e := concurrency.NewElection(s, election)
    ctx, cancel := context.WithCancel(context.TODO())

    donec := make(chan struct{})
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigc
        cancel()
        close(donec)
    }()

    //競選邏輯,將展開分析
    if err = e.Campaign(ctx, prop); err != nil {
        return err
    }

    // print key since elected
    resp, err := c.Get(ctx, e.Key())
    if err != nil {
        return err
    }
    display.Get(*resp)

    select {
    case <-donec:
    case <-s.Done():
        return errors.New("elect: session expired")
    }

    return e.Resign(context.TODO())
}

/*
 * 相似於zookeeper的臨時有序節點,etcd的選舉也是在相應的prefix path下面建立key,該key綁定了lease並根據lease id進行命名,
 * key建立後就有revision號,這樣使得在prefix path下的key也都是按revision有序
 * https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
 */
 
func (e *Election) Campaign(ctx context.Context, val string) error {
    s := e.session
    client := e.session.Client()
    
    //真正建立的key名爲:prefix + lease id
    k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
    //Txn:transaction,依靠Txn進行建立key的CAS操做,當key不存在時纔會成功建立
    txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
    txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
    txn = txn.Else(v3.OpGet(k))
    resp, err := txn.Commit()
    if err != nil {
        return err
    }
    e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
    //若是key已存在,則建立失敗;
        //當key的value與當前value不等時,若是本身爲leader,則不用從新執行選舉直接設置value;
        //不然報錯。
    if !resp.Succeeded {
        kv := resp.Responses[0].GetResponseRange().Kvs[0]
        e.leaderRev = kv.CreateRevision
        if string(kv.Value) != val {
            if err = e.Proclaim(ctx, val); err != nil {
                e.Resign(ctx)
                return err
            }
        }
    }
    
    //一直阻塞,直到確認本身的create revision爲當前path中最小,從而確認本身當選爲leader
    _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
    if err != nil {
        // clean up in case of context cancel
        select {
        case <-ctx.Done():
            e.Resign(client.Ctx())
        default:
            e.leaderSession = nil
        }
        return err
    }
    e.hdr = resp.Header

    return nil
}

鎖基礎:

https://tech.meituan.com/2018...

Reference

相關文章
相關標籤/搜索