使用Golang利用ectd實現一個分佈式鎖

http://blog.codeg.cn/post/blog/2016-02-24-distrubute-lock-over-etcd/node

 

 

 
本文  http://blog.codeg.cn/post/blog/2016-02-24-distrubute-lock-over-etcd/ 是做者 zieckey在研究和學習相關內容時所作的筆記,歡迎廣大朋友指正和交流! 版權全部,歡迎轉載和分享,但請保留此段聲明。

 

etcd是隨着CoreOS項目一塊兒成長起來的,隨着Golang和CoreOS等項目在開源社區日益火熱, etcd做爲一個高可用、強一致性的分佈式Key-Value存儲系統被愈來愈多的開發人員關注和使用。github

這篇文章全方位介紹了etcd的應用場景,這裏簡單摘要以下:golang

  • 服務發現(Service Discovery)
  • 消息發佈與訂閱
  • 負載均衡
  • 分佈式通知與協調
  • 分佈式鎖
  • 分佈式隊列
  • 集羣監控與Leader競選
  • 爲何用etcd而不用ZooKeeper

本文重點介紹如何利用ectd實現一個分佈式鎖。 鎖的概念你們都熟悉,當咱們但願某一事件在同一時間點只有一個線程(goroutine)在作,或者某一個資源在同一時間點只有一個服務能訪問,這個時候咱們就須要用到鎖。 例如咱們要實現一個分佈式的id生成器,多臺服務器之間的協調就很是麻煩。分佈式鎖就正好派上用場。api

其基本實現原理爲:服務器

  1. 在ectd系統裏建立一個key
  2. 若是建立失敗,key存在,則監聽該key的變化事件,直到該key被刪除,回到1
  3. 若是建立成功,則認爲我得到了鎖

具體代碼以下:app

package etcdsync import ( "fmt" "io" "os" "sync" "time" "github.com/coreos/etcd/client" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) const ( defaultTTL = 60 defaultTry = 3 deleteAction = "delete" expireAction = "expire" ) // A Mutex is a mutual exclusion lock which is distributed across a cluster. type Mutex struct { key string id string // The identity of the caller client client.Client kapi client.KeysAPI ctx context.Context ttl time.Duration mutex *sync.Mutex logger io.Writer } // New creates a Mutex with the given key which must be the same // across the cluster nodes. // machines are the ectd cluster addresses func New(key string, ttl int, machines []string) *Mutex { cfg := client.Config{ Endpoints: machines, Transport: client.DefaultTransport, HeaderTimeoutPerRequest: time.Second, } c, err := client.New(cfg) if err != nil { return nil } hostname, err := os.Hostname() if err != nil { return nil } if len(key) == 0 || len(machines) == 0 { return nil } if key[0] != '/' { key = "/" + key } if ttl < 1 { ttl = defaultTTL } return &Mutex{ key: key, id: fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")), client: c, kapi: client.NewKeysAPI(c), ctx: context.TODO(), ttl: time.Second * time.Duration(ttl), mutex: new(sync.Mutex), } } // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() (err error) { m.mutex.Lock() for try := 1; try <= defaultTry; try++ { if m.lock() == nil { return nil } m.debug("Lock node %v ERROR %v", m.key, err) if try < defaultTry { m.debug("Try to lock node %v again", m.key, err) } } return err } func (m *Mutex) lock() (err error) { m.debug("Trying to create a node : key=%v", m.key) setOptions := &client.SetOptions{ PrevExist:client.PrevNoExist, TTL: m.ttl, } resp, err := m.kapi.Set(m.ctx, m.key, m.id, setOptions) if err == nil { m.debug("Create node %v OK [%q]", m.key, resp) return nil } m.debug("Create node %v failed [%v]", m.key, err) e, ok := err.(client.Error) if !ok { return err } if e.Code != client.ErrorCodeNodeExist { return err } // Get the already node's value. resp, err = m.kapi.Get(m.ctx, m.key, nil) if err != nil { return err } m.debug("Get node %v OK", m.key) watcherOptions := &client.WatcherOptions{ AfterIndex : resp.Index, Recursive:false, } watcher := m.kapi.Watcher(m.key, watcherOptions) for { m.debug("Watching %v ...", m.key) resp, err = watcher.Next(m.ctx) if err != nil { return err } m.debug("Received an event : %q", resp) if resp.Action == deleteAction || resp.Action == expireAction { return nil } } } // Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() (err error) { defer m.mutex.Unlock() for i := 1; i <= defaultTry; i++ { var resp *client.Response resp, err = m.kapi.Delete(m.ctx, m.key, nil) if err == nil { m.debug("Delete %v OK", m.key) return nil } m.debug("Delete %v falied: %q", m.key, resp) e, ok := err.(client.Error) if ok && e.Code == client.ErrorCodeKeyNotFound { return nil } } return err } func (m *Mutex) debug(format string, v ...interface{}) { if m.logger != nil { m.logger.Write([]byte(m.id)) m.logger.Write([]byte(" ")) m.logger.Write([]byte(fmt.Sprintf(format, v...))) m.logger.Write([]byte("\n")) } } func (m *Mutex) SetDebugLogger(w io.Writer) { m.logger = w } 

其實相似的實現有不少,但目前都已通過時,使用的都是被官方標記爲deprecated的項目。且大部分接口都不如上述代碼簡單。 使用上,跟Golang官方sync包的Mutex接口很是相似,先New(),而後調用Lock(),使用完後調用Unlock(),就三個接口,就是這麼簡單。示例代碼以下:負載均衡

package main import ( "github.com/zieckey/etcdsync" "log" ) func main() { //etcdsync.SetDebug(true) log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile) m := etcdsync.New("/etcdsync", "123", []string{"http://127.0.0.1:2379"}) if m == nil { log.Printf("etcdsync.NewMutex failed") } err := m.Lock() if err != nil { log.Printf("etcdsync.Lock failed") } else { log.Printf("etcdsync.Lock OK") } log.Printf("Get the lock. Do something here.") err = m.Unlock() if err != nil { log.Printf("etcdsync.Unlock failed") } else { log.Printf("etcdsync.Unlock OK") } } 

參考

  1. etcdsync項目地址
  2. ectd項目官方地址
相關文章
相關標籤/搜索