分佈式系統選主場景分析及實現

一:須要選主的場景
1:服務有多臺機器,取其中一臺去執行任務。多臺機器同時執行會出問題,如將數據庫中狀態爲失敗的記錄取出來從新執行,若是多臺機器同時執行,會致使一個失敗的任務被多臺機器同時執行。
2:服務有多臺機器,選其中一臺做爲主,主負責任務的分發,你們一塊兒消費並處理任務。仍是將數據庫中狀態爲失敗的記錄取出來從新執行,因爲一臺機器可能處理不過來,須要多臺機器協同處理。這個時候主機器負責將失敗的記錄從數據庫中查出來,寫入消息隊列,其餘機器一同消費隊列中的任務,並處理失敗的記錄
 
二:進行選主
根據上面的選主場景,咱們其實能夠從多臺機器中隨機取一臺,比raft這種選主算法簡單得多。咱們甚至能夠在配置文件中指定一臺機器,只有這臺機器才執行相關功能,其餘機器則不執行。若是是固定的幾臺機器,且一臺機器也能完成咱們的需求,這樣搞其實也能夠。若是機器不固定,並且單臺處理不過來時,用配置文件的方式就不適合。
可採用競爭選主的方式,誰先搶到誰就是主。
 
1:方案一
採用redis方案實現。若是指定的key不存在就將機器信息寫入這個key,成功寫入的那臺機器就是主,設置過時時間,防止機器異常掛掉的狀況,全部的機器都須要定時去搶redis鎖。SETNX這個命令就知足咱們的需求,寫redis成功的就是主,寫失敗的就是從。
優勢:
  • 1:實現簡單,比配置文件的方式好一點,支持機器動態
缺點:
  • 1:須要定時去搶鎖
  • 2:主可能常常變化,並且要保證主在切換的過程當中業務邏輯的正確性
  • 3:有些時間片可能沒有主,就是主掛掉了,而其餘機器還沒到搶鎖的時間,這個時間片就沒有主
 
2:方案二
採用etcd方案實現。etcd支持事務能作到不存在就寫入,達到redis SETNX同樣的效果,並且經過etcd的租賃機制保證在主掛掉的狀況下通知全部機器,這時你們自動開始新一輪的選主,仍是那句話第一個搶到的就是主。
優勢:
  • 知足咱們的需求,沒有設計上的缺陷
  • 只有主掛掉的狀況,纔會從新選主,不用擔憂主在切換的過程當中對業務邏輯的影響
缺點:
  • 實現起來相對複雜,那我就來試試吧 

golang源碼實現以下:git

  1 package etcdDemo
  2 
  3 import (
  4     "context"
  5     "fmt"
  6     "github.com/coreos/etcd/clientv3"
  7     "github.com/google/uuid"
  8     "time"
  9 )
 10 
 11 type Callback func(isMaster bool)
 12 
 13 type SelectMaster struct {
 14     endPoints []string
 15     key       string
 16     cli       *clientv3.Client
 17     lease     *clientv3.LeaseGrantResponse
 18     chClose   chan int
 19     callback  Callback
 20     token     string
 21     isMaster  bool
 22 }
 23 
 24 func NewSelectMaster(endPoints []string, key string) (*SelectMaster, error) {
 25     sm := &SelectMaster{
 26         endPoints: endPoints,
 27         key:       key,
 28         chClose:   make(chan int, 0),
 29         token:     uuid.New().String(),
 30     }
 31 
 32     cli, err := clientv3.New(clientv3.Config{
 33         Endpoints:   endPoints,
 34         DialTimeout: 3 * time.Second,
 35     })
 36     if err != nil {
 37         return sm, err
 38     }
 39     sm.cli = cli
 40     go sm.ioLoop()
 41     return sm, nil
 42 }
 43 
 44 func (sm *SelectMaster) ioLoop() {
 45     fmt.Println("SelectMaster.ioLoop start")
 46     ticker := time.NewTicker(time.Second * 3)
 47     defer ticker.Stop()
 48     chWatch := sm.cli.Watch(context.TODO(), sm.key)
 49     for {
 50         select {
 51         case <-ticker.C:
 52             if sm.lease == nil {
 53                 leaseResp, err := sm.cli.Grant(context.Background(), 4)
 54                 if err != nil {
 55                     fmt.Println("cli.Grant error=", err.Error())
 56                 } else {
 57                     sm.lease = leaseResp
 58                 }
 59             }
 60             if sm.lease != nil {
 61                 _, err := sm.cli.KeepAliveOnce(context.Background(), sm.lease.ID)
 62                 if err != nil {
 63                     fmt.Println("cli.KeepAliveOnce error=", err.Error())
 64                     break
 65                 }
 66             }
 67         case c := <-chWatch:
 68             for _, e := range c.Events {
 69                 if e == nil || e.Kv == nil {
 70                     continue
 71                 }
 72                 token := string(e.Kv.Value)
 73                 sm.isMaster = sm.token == token
 74                 if sm.callback == nil {
 75                     fmt.Println("SelectMaster.callback is nil")
 76                 } else {
 77                     sm.callback(sm.isMaster)
 78                     fmt.Println("SelectMaster.isLoop token=", token)
 79                     if token == "" { //主掛了,開始競選
 80                         sm.election()
 81                     }
 82                 }
 83             }
 84         case <-sm.chClose:
 85             goto stop
 86         }
 87     }
 88 stop:
 89     fmt.Println("SelectMaster.ioLoop end")
 90 }
 91 
 92 func (sm *SelectMaster) IsMaster() bool {
 93     return sm.isMaster
 94 }
 95 
 96 func (sm *SelectMaster) Close() {
 97     sm.chClose <- 1
 98 }
 99 
100 func (sm *SelectMaster) Election(callback Callback) (bool, error) {
101     sm.callback = callback
102     return sm.election()
103 }
104 
105 func (sm *SelectMaster) election() (bool, error) {
106     ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
107     defer cancel()
108     leaseResp, err := sm.cli.Grant(ctx, 10)
109     if err != nil {
110         return false, err
111     }
112     sm.lease = leaseResp
113     txn := clientv3.NewKV(sm.cli).Txn(context.TODO())
114     txn.If(clientv3.Compare(clientv3.CreateRevision(sm.key), "=", 0)).
115         Then(clientv3.OpPut(sm.key, sm.token, clientv3.WithLease(leaseResp.ID))).Else()
116     txnResp, err := txn.Commit()
117     if err != nil {
118         return false, err
119     }
120     return txnResp.Succeeded, nil
121 }
122 
123 func testSelectMaster() *SelectMaster {
124     endPoints := []string{"172.25.20.248:2379"}
125     sm, err := NewSelectMaster(endPoints, "/test/lock")
126     if err != nil {
127         fmt.Println(err.Error())
128         return nil
129     }
130     callback := func(isMaster bool) {
131         fmt.Println(sm.token, "callback=", isMaster)
132     }
133     isSuccess, err := sm.Election(callback)
134     if err != nil {
135         fmt.Println(sm.token, "Election=", err.Error())
136     } else {
137         fmt.Println(sm.token, "Election=", isSuccess)
138     }
139     return sm
140 }
141 
142 func TestSelectMaster() {
143     var master *SelectMaster
144     for i := 0; i < 3; i++ {
145         sm := testSelectMaster()
146         if sm.IsMaster() {
147             master = sm
148         }
149     }
150     if master != nil {
151         master.Close()
152     }
153     time.Sleep(time.Second*10)
154 }
相關文章
相關標籤/搜索