下面描述使用 Etcd 實現分佈式鎖的業務流程,假設對某個共享資源設置的鎖名爲:/lock/mylock
php
步驟 1: 準備java
客戶端鏈接 Etcd,以 /lock/mylock
爲前綴建立全局惟一的 key,假設第一個客戶端對應的 key="/lock/mylock/UUID1"
,第二個爲 key="/lock/mylock/UUID2"
;客戶端分別爲本身的 key 建立租約 - Lease,租約的長度根據業務耗時肯定,假設爲 15s;bash
步驟 2: 建立定時任務做爲租約的「心跳」分佈式
當一個客戶端持有鎖期間,其它客戶端只能等待,爲了不等待期間租約失效,客戶端需建立一個定時任務做爲「心跳」進行續約。此外,若是持有鎖期間客戶端崩潰,心跳中止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。ui
步驟 3: 客戶端將本身全局惟一的 key 寫入 Etcdspa
進行 put 操做,將步驟 1 中建立的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,假設兩個客戶端 put 操做返回的 Revision 分別爲 一、2,客戶端需記錄 Revision 用以接下來判斷本身是否得到鎖。線程
步驟 4: 客戶端判斷是否得到鎖code
客戶端之前綴 /lock/mylock
讀取 keyValue 列表(keyValue 中帶有 key 對應的 Revision),判斷本身 key 的 Revision 是否爲當前列表中最小的,若是是則認爲得到鎖;不然監聽列表中前一個 Revision 比本身小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則本身得到鎖。事件
步驟 5: 執行業務資源
得到鎖後,操做共享資源,執行業務代碼。
步驟 6: 釋放鎖
完成業務流程後,刪除對應的key釋放鎖。
咱們但願同一時間只有一個線程可以訪問到資源,可是分佈式資源點之間的協調會很是麻煩,這個時候咱們就須要一個分佈式鎖。
1.利用租約在etcd集羣中建立一個key,這個key有兩種形態,存在和不存在,而這兩種形態就是互斥量。
2.若是這個key不存在,那麼線程建立key,成功則獲取到鎖,該key就爲存在狀態。
3.若是該key已經存在,那麼線程就不能建立key,則獲取鎖失敗。
在使用該鎖時,須要傳入Ttl,Conf,Key字段來初始化鎖
type EtcdMutex struct { Ttl int64 //租約時間 Conf clientv3.Config //etcd集羣配置 Key string //etcd的key cancel context.CancelFunc //關閉續租的func lease clientv3.Lease leaseID clientv3.LeaseID txn clientv3.Txn }
func(em *EtcdMutex)init()error{
var err error var ctx context.Context client,err := clientv3.New(em.Conf) if err != nil{ return err } em.txn = clientv3.NewKV(client).Txn(context.TODO()) em.lease = clientv3.NewLease(client) leaseResp,err := em.lease.Grant(context.TODO(),em.Ttl) if err != nil{ return err } ctx,em.cancel = context.WithCancel(context.TODO()) em.leaseID = leaseResp.ID _,err = em.lease.KeepAlive(ctx,em.leaseID) return err }
func(em *EtcdMutex)Lock()error{
err := em.init()
if err != nil{ return err } //LOCK: em.txn.If(clientv3.Compare(clientv3.CreateRevision(em.Key),"=",0)). Then(clientv3.OpPut(em.Key,"",clientv3.WithLease(em.leaseID))). Else() txnResp,err := em.txn.Commit() if err != nil{ return err } if !txnResp.Succeeded{ //判斷txn.if條件是否成立 return fmt.Errof("搶鎖失敗") } return nil }
func(em *EtcdMutex)UnLock(){ em.cancel() em.lease.Revoke(context.TODO(),em.leaseID) fmt.Println("釋放了鎖") }
func main(){ var conf = clientv3.Config{ Endpoints: []string{"172.16.196.129:2380", "192.168.50.250:2380"}, DialTimeout: 5 * time.Second, } eMutex1 := &EtcdMutex{ Conf:conf, Ttl:10, Key:"lock", } eMutex2 := &EtcdMutex{ Conf:conf, Ttl:10, Key:"lock", } //groutine1 go func() { err := eMutex1.Lock() if err != nil{ fmt.Println("groutine1搶鎖失敗") fmt.Println(err) return } fmt.Println("groutine1搶鎖成功") time.Sleep(10*time.Second) defer eMutex.UnLock() }() //groutine2 go func() { err := eMutex2.Lock() if err != nil{ fmt.Println("groutine2搶鎖失敗") fmt.Println(err) return } fmt.Println("groutine2搶鎖成功") defer eMutex.UnLock() }() time.Sleep(30*time.Second) }