在傳統單體應用單機部署的狀況下,可使用Java併發處理相關的API(如ReentrantLock或Synchronized)進行互斥控制。在單機環境中,Java中提供了不少併發處理相關的API。可是,隨着業務發展的須要,原單體單機部署的系統被演化成分佈式集羣系統後,因爲分佈式系統多線程、多進程而且分佈在不一樣機器上,這將使原單機部署狀況下的併發控制鎖策略失效,單純的Java API並不能提供分佈式鎖的能力。爲了解決這個問題就須要一種跨JVM的互斥機制來控制共享資源的訪問,這就是分佈式鎖要解決的問題!php
鎖是在執行多線程時用於強行限制資源訪問的同步機制,在單機系統上,可使用Java併發處理相關的API(如ReentrantLock或Synchronized)進行互斥控制。而在分佈式系統場景下,實例會運行在多臺機器上,爲了使多進程(多實例上)對共享資源的讀寫同步,保證數據的最終一致性,引入了分佈式鎖。html
分佈式鎖應具有如下特色:java
分佈式鎖常見實現方式:git
網上常見的是基於Redis和ZooKeeper的實現,基於數據庫的由於實現繁瑣且性能較差,不想維護第三方中間件的能夠考慮。本文主要描述基於 ETCD 的實現,etcd3 的client也給出了新的 api,使用上更爲簡單github
既然是鎖,核心操做無外乎加鎖、解鎖。golang
Redis的加鎖操做:算法
SET lock_name my_random_value NX PX 30000
Redis的解鎖操做:數據庫
del lock_name
etcd 支持如下功能,正是依賴這些功能來實現分佈式鎖的:api
實現過程:緩存
客戶端鏈接 Etcd,以 /lock/mylock 爲前綴建立全局惟一的 key,假設第一個客戶端對應的 key="/lock/mylock/UUID1",第二個爲 key="/lock/mylock/UUID2";客戶端分別爲本身的 key 建立租約 - Lease,租約的長度根據業務耗時肯定,假設爲 15s;
當一個客戶端持有鎖期間,其它客戶端只能等待,爲了不等待期間租約失效,客戶端需建立一個定時任務做爲「心跳」進行續約。此外,若是持有鎖期間客戶端崩潰,心跳中止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。
進行 put 操做,將步驟 1 中建立的 key 綁定租約寫入 Etcd,根據 Etcd 的 Revision 機制,假設兩個客戶端 put 操做返回的 Revision 分別爲 一、2,客戶端需記錄 Revision 用以接下來判斷本身是否得到鎖。
客戶端之前綴 /lock/mylock 讀取 keyValue 列表(keyValue 中帶有 key 對應的 Revision),判斷本身 key 的 Revision 是否爲當前列表中最小的,若是是則認爲得到鎖;不然監聽列表中前一個 Revision 比本身小的 key 的刪除事件,一旦監聽到刪除事件或者因租約失效而刪除的事件,則本身得到鎖。
得到鎖後,操做共享資源,執行業務代碼。
完成業務流程後,刪除對應的key釋放鎖。
自帶的 etcdctl 能夠模擬鎖的使用:
// 第一個終端 $ ./etcdctl lock mutex1 mutex1/326963a02758b52d // 第二終端 $ ./etcdctl lock mutex1 // 當第一個終端結束了,第二個終端會顯示 mutex1/326963a02758b531
在etcd的clientv3包中,實現了分佈式鎖。使用起來和mutex是相似的,爲了瞭解其中的工做機制,這裏簡要的作一下總結。
etcd分佈式鎖的實如今go.etcd.io/etcd/clientv3/concurrency包中,主要提供瞭如下幾個方法:
* func NewMutex(s *Session, pfx string) *Mutex, 用來新建一個mutex * func (m *Mutex) Lock(ctx context.Context) error,它會阻塞直到拿到了鎖,而且支持經過context來取消獲取鎖。 * func (m *Mutex) Unlock(ctx context.Context) error,解鎖
所以在使用etcd提供的分佈式鎖式很是簡單,一般就是實例化一個mutex,而後嘗試搶佔鎖,以後進行業務處理,最後解鎖便可。
demo:
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" "os" "os/signal" "time" ) func main() { c := make(chan os.Signal) signal.Notify(c) cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } defer cli.Close() lockKey := "/lock" go func () { session, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } m := concurrency.NewMutex(session, lockKey) if err := m.Lock(context.TODO()); err != nil { log.Fatal("go1 get mutex failed " + err.Error()) } fmt.Printf("go1 get mutex sucess\n") fmt.Println(m) time.Sleep(time.Duration(10) * time.Second) m.Unlock(context.TODO()) fmt.Printf("go1 release lock\n") }() go func() { time.Sleep(time.Duration(2) * time.Second) session, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } m := concurrency.NewMutex(session, lockKey) if err := m.Lock(context.TODO()); err != nil { log.Fatal("go2 get mutex failed " + err.Error()) } fmt.Printf("go2 get mutex sucess\n") fmt.Println(m) time.Sleep(time.Duration(2) * time.Second) m.Unlock(context.TODO()) fmt.Printf("go2 release lock\n") }() <-c }
Lock()函數的實現很簡單:
// Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func (m *Mutex) Lock(ctx context.Context) error { s := m.s client := m.s.Client() m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock get := v3.OpGet(m.myKey) // fetch current holder to complete uncontended path with only one RPC getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return err } m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } // wait for deletion revisions prior to myKey hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) } else { m.hdr = hdr } return werr }
首先經過一個事務來嘗試加鎖,這個事務主要包含了4個操做: cmp、put、get、getOwner。須要注意的是,key是由pfx和Lease()組成的。
接下來纔是經過判斷來檢查是否持有鎖
m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil }
m.myRev是當前的版本號,resp.Succeeded是cmp爲true時值爲true,不然是false。這裏的判斷代表當同一個session非第一次嘗試加鎖,當前的版本號應該取這個key的最新的版本號。
下面是取得鎖的持有者的key。若是當前沒有人持有這把鎖,那麼默認當前會話得到了鎖。或者鎖持有者的版本號和當前的版本號一致, 那麼當前的會話就是鎖的持有者。
// wait for deletion revisions prior to myKey hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) } else { m.hdr = hdr }
上面這段代碼就很好理解了,由於走到這裏說明沒有獲取到鎖,那麼這裏等待鎖的刪除。
waitDeletes方法的實現也很簡單,可是須要注意的是,這裏的getOpts只會獲取比當前會話版本號更低的key,而後去監控最新的key的刪除。等這個key刪除了,本身也就拿到鎖了。
這種分佈式鎖的實現和我一開始的預想是不一樣的。它不存在鎖的競爭,不存在重複的嘗試加鎖的操做。而是經過使用統一的前綴pfx來put,而後根據各自的版本號來排隊獲取鎖。效率很是的高。避免了驚羣效應
如圖所示,共有4個session來加鎖,那麼根據revision來排隊,獲取鎖的順序爲session2 -> session3 -> session1 -> session4。
這裏面須要注意一個驚羣效應,每個client在鎖住/lock這個path的時候,實際都已經插入了本身的數據,相似/lock/LEASE_ID,而且返回了各自的index(就是raft算法裏面的日誌索引),而只有最小的纔算是拿到了鎖,其餘的client須要watch等待。例如client1拿到了鎖,client2和client3在等待,而client2拿到的index比client3的更小,那麼對於client1刪除鎖以後,client3其實並不關心,並不須要去watch。因此綜上,等待的節點只須要watch比本身index小而且差距最小的節點刪除事件便可。
etcd有多種使用場景,Master選舉是其中一種。提及Master選舉,過去經常使用zookeeper,經過建立EPHEMERAL_SEQUENTIAL節點(臨時有序節點),咱們選擇序號最小的節點做爲Master,邏輯直觀,實現簡單是其優點,可是要實現一個高健壯性的選舉並不簡單,同時zookeeper繁雜的擴縮容機制也是沉重的負擔。
master 選舉根本上也是搶鎖,與zookeeper直觀選舉邏輯相比,etcd的選舉則須要在咱們熟悉它的一系列基本概念後,調動咱們充分的想象力:
至此,etcd選舉的邏輯大致清晰了,但這一系列操做與zookeeper相比複雜不少,有沒有已經封裝好的庫能夠直接拿來用?etcd clientv3 concurrency中有對選舉及分佈式鎖的封裝。後面進一步發現,etcdctl v3裏已經有master選舉的實現了,下面針對這部分代碼進行簡單註釋,在最後參考這部分代碼實現本身的選舉邏輯。
官方示例:https://github.com/etcd-io/et...
如crontab 示例:
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "log" "time" ) const prefix = "/election-demo" const prop = "local" func main() { endpoints := []string{"szth-cce-devops00.szth.baidu.com:8379"} cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() campaign(cli, prefix, prop) } func campaign(c *clientv3.Client, election string, prop string) { for { // 租約到期時間:5s s, err := concurrency.NewSession(c, concurrency.WithTTL(5)) if err != nil { fmt.Println(err) continue } e := concurrency.NewElection(s, election) ctx := context.TODO() log.Println("開始競選") err = e.Campaign(ctx, prop) if err != nil { log.Println("競選 leader失敗,繼續") switch { case err == context.Canceled: return default: continue } } log.Println("得到leader") if err := doCrontab(); err != nil { log.Println("調用主方法失敗,辭去leader,從新競選") _ = e.Resign(ctx) continue } return } } func doCrontab() error { for { fmt.Println("doCrontab") time.Sleep(time.Second * 4) //return fmt.Errorf("sss") } }
/* * 發起競選 * 未當選leader前,會一直阻塞在Campaign調用 * 當選leader後,等待SIGINT、SIGTERM或session過時而退出 * https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go */ func campaign(c *clientv3.Client, election string, prop string) error { //NewSession函數中建立了一個lease,默認是60s TTL,並會調用KeepAlive,永久爲這個lease自動續約(2/3生命週期的時候執行續約操做) s, err := concurrency.NewSession(c) if err != nil { return err } e := concurrency.NewElection(s, election) ctx, cancel := context.WithCancel(context.TODO()) donec := make(chan struct{}) sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigc cancel() close(donec) }() //競選邏輯,將展開分析 if err = e.Campaign(ctx, prop); err != nil { return err } // print key since elected resp, err := c.Get(ctx, e.Key()) if err != nil { return err } display.Get(*resp) select { case <-donec: case <-s.Done(): return errors.New("elect: session expired") } return e.Resign(context.TODO()) } /* * 相似於zookeeper的臨時有序節點,etcd的選舉也是在相應的prefix path下面建立key,該key綁定了lease並根據lease id進行命名, * key建立後就有revision號,這樣使得在prefix path下的key也都是按revision有序 * https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go */ func (e *Election) Campaign(ctx context.Context, val string) error { s := e.session client := e.session.Client() //真正建立的key名爲:prefix + lease id k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease()) //Txn:transaction,依靠Txn進行建立key的CAS操做,當key不存在時纔會成功建立 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0)) txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease()))) txn = txn.Else(v3.OpGet(k)) resp, err := txn.Commit() if err != nil { return err } e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s //若是key已存在,則建立失敗; //當key的value與當前value不等時,若是本身爲leader,則不用從新執行選舉直接設置value; //不然報錯。 if !resp.Succeeded { kv := resp.Responses[0].GetResponseRange().Kvs[0] e.leaderRev = kv.CreateRevision if string(kv.Value) != val { if err = e.Proclaim(ctx, val); err != nil { e.Resign(ctx) return err } } } //一直阻塞,直到確認本身的create revision爲當前path中最小,從而確認本身當選爲leader _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1) if err != nil { // clean up in case of context cancel select { case <-ctx.Done(): e.Resign(client.Ctx()) default: e.leaderSession = nil } return err } e.hdr = resp.Header return nil }
鎖基礎:
https://tech.meituan.com/2018...