如今有一個需求,應用啓動時須要初始化一些數據,爲了保證高可用,會啓動多副本(replicas >= 3),如何保證數據不會重複?git
最簡單的方法,初始化數據都帶上主鍵,這樣主鍵衝突就會報錯。可是這麼作咱們須要對衝突的錯誤進行額外處理,由於插入咱們通常會複用已寫好的 DAO 層代碼。github
另外,初始化數據的主鍵多是動態生成的,並不想把主鍵寫死。因此下面來介紹這次的主角:基於 MySQL 的分佈式鎖的解決方案。redis
多副本分佈式應用,在這種 n 選 1 競爭某個資源或執行權的場景,通常都會用到分佈式鎖。分佈式有不少種實現方式,如基於 redis,etcd,zookeeper,file 等系統。本質上,就是找個多個節點都承認的地方保存數據,經過數據競態來實現鎖,固然這個依賴最好是高可用,不然會引起單點故障。分佈式
多個副本都使用同一個 MySQL,因此咱們能夠很方便的基於 MySQL 實現一個分佈式鎖。原理很簡單,利用惟一索引保證只有一個副本能插入某條數據,插入成功則表示取鎖成功,執行完畢則刪除該條數據釋放鎖。測試
建一個表用來存放鎖數據,將 Action 設爲惟一索引,表示對某個動做加鎖,如:init 初始化,cronjob 定時任務等不一樣動做之間加鎖互不影響。線程
type lock struct { Id string `gorm:"primary_key"` CreatedAt time.Time UpdatedAt time.Time ExpiredAt time.Time // 鎖過時時間 Action string `gorm:"unique;not null"` Holder string // 持鎖人信息,能夠使用 hostname }
既然有過時時間,那麼持鎖時間設爲多長合適呢?設置過短可能邏輯還沒執行完鎖就過時了;設置太長若是程序中途掛了沒有釋放鎖,那麼這段時間全部節點都拿不到鎖。code
要解決這個問題咱們能夠使用租約機制(lease),設置較短的持鎖時間,而後在持鎖週期內,不斷延長持鎖時間,直到主動釋放。這樣即便程序崩潰沒有 UnLock,鎖也會由於沒有刷新租約很快過時,不影響其餘節點獲取鎖。orm
Lock 時啓動一個 goroutine 刷新租約,Unlock 時經過 stopCh 將其中止。索引
另外,MySQL 中並無線程去處理過時的記錄,因此咱們在調用 Lock 時先嚐試將過時記錄刪掉。進程
核心代碼:
func NewLockDb(action, holder string, lease time.Duration) *lockDb { return &lockDb{ db: GetDB(context.Background()), stopCh: make(chan struct{}), action: action, holder: holder, leaseAge: lease, } } func (s *lockDb) Lock() (bool, error) { err := s.cleanExpired() if err != nil { return false, errx.WithStackOnce(err) } err = s.db.Create(&lock{ ExpiredAt: time.Now().Add(s.leaseAge), Action: s.action, Holder: s.holder, }).Error if err != nil { // Duplicate entry '<action_val>' for key 'action' if strings.Contains(err.Error(), "Duplicate entry") { return false, nil } return false, errx.WithStackOnce(err) } s.startLease() log.Debugf("%s get lock", s.holder) return true, nil } func (s *lockDb) UnLock() error { s.stopLease() var err error defer func() { err = s.db. Where("action = ? and holder = ?", s.action, s.holder). Delete(&lock{}). Error }() return err } func (s *lockDb) cleanExpired() error { err := s.db. Where("expired_at < ?", time.Now()). Delete(&lock{}). Error return err } func (s *lockDb) startLease() { go func() { // 剩餘 1/4 時刷新租約 ticker := time.NewTicker(s.leaseAge * 3 / 4) for { select { case <-ticker.C: err := s.refreshLease() if err != nil { log.Errorf("refreash lease err: %s", err) } else { log.Debug("lease refreshed") } case <-s.stopCh: log.Debug("lease stopped") return } } }() } func (s *lockDb) stopLease() { close(s.stopCh) } func (s *lockDb) refreshLease() error { err := s.db.Model(&lock{}). Where("action = ? and holder = ?", s.action, s.holder). Update("expired_at", time.Now().Add(s.leaseAge)). Error return err }
使用及測試:
func TestLock(t *testing.T) { i := 3 wg := &sync.WaitGroup{} wg.Add(i) for i > 0 { holder := strconv.Itoa(i) action := "test" i-- go func() { defer wg.Done() locker := dbcore.NewLockDb(action, holder, 10*time.Second) if _, err := locker.Lock(); err != nil { t.Logf("not hold the lock, err: %+v", err) return } time.Sleep(30 * time.Second) locker.UnLock() }() } wg.Wait() }
完整代碼:https://github.com/win5do/go-...
這個分佈式鎖實如今初始數據場景是夠用了,但並不完美,例如:依賴時間同步,不能容忍時間偏斜;獲取鎖不是阻塞的,若是要搶鎖須要使用方自旋; 鎖不可重入,粒度是進程級別,同一個 Action,當前進程獲取鎖後,釋放後才能再次獲取鎖。
你們能夠思考一下如何完善。