golang源碼實現以下:git
1 package etcdDemo 2 3 import ( 4 "context" 5 "fmt" 6 "github.com/coreos/etcd/clientv3" 7 "github.com/google/uuid" 8 "time" 9 ) 10 11 type Callback func(isMaster bool) 12 13 type SelectMaster struct { 14 endPoints []string 15 key string 16 cli *clientv3.Client 17 lease *clientv3.LeaseGrantResponse 18 chClose chan int 19 callback Callback 20 token string 21 isMaster bool 22 } 23 24 func NewSelectMaster(endPoints []string, key string) (*SelectMaster, error) { 25 sm := &SelectMaster{ 26 endPoints: endPoints, 27 key: key, 28 chClose: make(chan int, 0), 29 token: uuid.New().String(), 30 } 31 32 cli, err := clientv3.New(clientv3.Config{ 33 Endpoints: endPoints, 34 DialTimeout: 3 * time.Second, 35 }) 36 if err != nil { 37 return sm, err 38 } 39 sm.cli = cli 40 go sm.ioLoop() 41 return sm, nil 42 } 43 44 func (sm *SelectMaster) ioLoop() { 45 fmt.Println("SelectMaster.ioLoop start") 46 ticker := time.NewTicker(time.Second * 3) 47 defer ticker.Stop() 48 chWatch := sm.cli.Watch(context.TODO(), sm.key) 49 for { 50 select { 51 case <-ticker.C: 52 if sm.lease == nil { 53 leaseResp, err := sm.cli.Grant(context.Background(), 4) 54 if err != nil { 55 fmt.Println("cli.Grant error=", err.Error()) 56 } else { 57 sm.lease = leaseResp 58 } 59 } 60 if sm.lease != nil { 61 _, err := sm.cli.KeepAliveOnce(context.Background(), sm.lease.ID) 62 if err != nil { 63 fmt.Println("cli.KeepAliveOnce error=", err.Error()) 64 break 65 } 66 } 67 case c := <-chWatch: 68 for _, e := range c.Events { 69 if e == nil || e.Kv == nil { 70 continue 71 } 72 token := string(e.Kv.Value) 73 sm.isMaster = sm.token == token 74 if sm.callback == nil { 75 fmt.Println("SelectMaster.callback is nil") 76 } else { 77 sm.callback(sm.isMaster) 78 fmt.Println("SelectMaster.isLoop token=", token) 79 if token == "" { //主掛了,開始競選 80 sm.election() 81 } 82 } 83 } 84 case <-sm.chClose: 85 goto stop 86 } 87 } 88 stop: 89 fmt.Println("SelectMaster.ioLoop end") 90 } 91 92 func (sm *SelectMaster) IsMaster() bool { 93 return sm.isMaster 94 } 95 96 func (sm *SelectMaster) Close() { 97 sm.chClose <- 1 98 } 99 100 func (sm *SelectMaster) Election(callback Callback) (bool, error) { 101 sm.callback = callback 102 return sm.election() 103 } 104 105 func (sm *SelectMaster) election() (bool, error) { 106 ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) 107 defer cancel() 108 leaseResp, err := sm.cli.Grant(ctx, 10) 109 if err != nil { 110 return false, err 111 } 112 sm.lease = leaseResp 113 txn := clientv3.NewKV(sm.cli).Txn(context.TODO()) 114 txn.If(clientv3.Compare(clientv3.CreateRevision(sm.key), "=", 0)). 115 Then(clientv3.OpPut(sm.key, sm.token, clientv3.WithLease(leaseResp.ID))).Else() 116 txnResp, err := txn.Commit() 117 if err != nil { 118 return false, err 119 } 120 return txnResp.Succeeded, nil 121 } 122 123 func testSelectMaster() *SelectMaster { 124 endPoints := []string{"172.25.20.248:2379"} 125 sm, err := NewSelectMaster(endPoints, "/test/lock") 126 if err != nil { 127 fmt.Println(err.Error()) 128 return nil 129 } 130 callback := func(isMaster bool) { 131 fmt.Println(sm.token, "callback=", isMaster) 132 } 133 isSuccess, err := sm.Election(callback) 134 if err != nil { 135 fmt.Println(sm.token, "Election=", err.Error()) 136 } else { 137 fmt.Println(sm.token, "Election=", isSuccess) 138 } 139 return sm 140 } 141 142 func TestSelectMaster() { 143 var master *SelectMaster 144 for i := 0; i < 3; i++ { 145 sm := testSelectMaster() 146 if sm.IsMaster() { 147 master = sm 148 } 149 } 150 if master != nil { 151 master.Close() 152 } 153 time.Sleep(time.Second*10) 154 }