基於 MySQL 分佈式鎖,防止多副本應用初始化數據重複

如今有一個需求,應用啓動時須要初始化一些數據,爲了保證高可用,會啓動多副本(replicas >= 3),如何保證數據不會重複?git

方案一:數據帶上主鍵

最簡單的方法,初始化數據都帶上主鍵,這樣主鍵衝突就會報錯。可是這麼作咱們須要對衝突的錯誤進行額外處理,由於插入咱們通常會複用已寫好的 DAO 層代碼。github

另外,初始化數據的主鍵多是動態生成的,並不想把主鍵寫死。因此下面來介紹這次的主角:基於 MySQL 的分佈式鎖的解決方案。redis

方案二:基於 MySQL 的分佈式鎖

多副本分佈式應用,在這種 n 選 1 競爭某個資源或執行權的場景,通常都會用到分佈式鎖。分佈式有不少種實現方式,如基於 redis,etcd,zookeeper,file 等系統。本質上,就是找個多個節點都承認的地方保存數據,經過數據競態來實現鎖,固然這個依賴最好是高可用,不然會引起單點故障。分佈式

多個副本都使用同一個 MySQL,因此咱們能夠很方便的基於 MySQL 實現一個分佈式鎖。原理很簡單,利用惟一索引保證只有一個副本能插入某條數據,插入成功則表示取鎖成功,執行完畢則刪除該條數據釋放鎖。測試

建一個表用來存放鎖數據,將 Action 設爲惟一索引,表示對某個動做加鎖,如:init 初始化,cronjob 定時任務等不一樣動做之間加鎖互不影響。線程

type lock struct {
    Id        string `gorm:"primary_key"`
    CreatedAt time.Time
    UpdatedAt time.Time
    ExpiredAt time.Time // 鎖過時時間
    Action    string `gorm:"unique;not null"`
    Holder    string // 持鎖人信息,能夠使用 hostname
}

既然有過時時間,那麼持鎖時間設爲多長合適呢?設置過短可能邏輯還沒執行完鎖就過時了;設置太長若是程序中途掛了沒有釋放鎖,那麼這段時間全部節點都拿不到鎖。code

要解決這個問題咱們能夠使用租約機制(lease),設置較短的持鎖時間,而後在持鎖週期內,不斷延長持鎖時間,直到主動釋放。這樣即便程序崩潰沒有 UnLock,鎖也會由於沒有刷新租約很快過時,不影響其餘節點獲取鎖。orm

Lock 時啓動一個 goroutine 刷新租約,Unlock 時經過 stopCh 將其中止。索引

另外,MySQL 中並無線程去處理過時的記錄,因此咱們在調用 Lock 時先嚐試將過時記錄刪掉。進程

核心代碼:

func NewLockDb(action, holder string, lease time.Duration) *lockDb {
    return &lockDb{
        db:       GetDB(context.Background()),
        stopCh:   make(chan struct{}),
        action:   action,
        holder:   holder,
        leaseAge: lease,
    }
}

func (s *lockDb) Lock() (bool, error) {
    err := s.cleanExpired()
    if err != nil {
        return false, errx.WithStackOnce(err)
    }

    err = s.db.Create(&lock{
        ExpiredAt: time.Now().Add(s.leaseAge),
        Action:    s.action,
        Holder:    s.holder,
    }).Error
    if err != nil {
        // Duplicate entry '<action_val>' for key 'action'
        if strings.Contains(err.Error(), "Duplicate entry") {
            return false, nil
        }
        return false, errx.WithStackOnce(err)
    }

    s.startLease()

    log.Debugf("%s get lock", s.holder)

    return true, nil
}

func (s *lockDb) UnLock() error {
    s.stopLease()
    var err error

    defer func() {
        err = s.db.
            Where("action = ? and holder = ?", s.action, s.holder).
            Delete(&lock{}).
            Error
    }()

    return err
}

func (s *lockDb) cleanExpired() error {
    err := s.db.
        Where("expired_at < ?", time.Now()).
        Delete(&lock{}).
        Error

    return err
}

func (s *lockDb) startLease() {
    go func() {
        // 剩餘 1/4 時刷新租約
        ticker := time.NewTicker(s.leaseAge * 3 / 4)
        for {
            select {
            case <-ticker.C:
                err := s.refreshLease()
                if err != nil {
                    log.Errorf("refreash lease err: %s", err)
                } else {
                    log.Debug("lease refreshed")
                }
            case <-s.stopCh:
                log.Debug("lease stopped")
                return
            }
        }
    }()
}

func (s *lockDb) stopLease() {
    close(s.stopCh)
}

func (s *lockDb) refreshLease() error {
    err := s.db.Model(&lock{}).
        Where("action = ? and holder = ?", s.action, s.holder).
        Update("expired_at", time.Now().Add(s.leaseAge)).
        Error

    return err
}

使用及測試:

func TestLock(t *testing.T) {
    i := 3
    wg := &sync.WaitGroup{}
    wg.Add(i)

    for i > 0 {
        holder := strconv.Itoa(i)
        action := "test"

        i--
        go func() {
            defer wg.Done()

            locker := dbcore.NewLockDb(action, holder, 10*time.Second)

            if _, err := locker.Lock(); err != nil {
                t.Logf("not hold the lock, err: %+v", err)
                return
            }

            time.Sleep(30 * time.Second)
            locker.UnLock()
        }()
    }

    wg.Wait()
}

完整代碼:https://github.com/win5do/go-...

這個分佈式鎖實如今初始數據場景是夠用了,但並不完美,例如:依賴時間同步,不能容忍時間偏斜;獲取鎖不是阻塞的,若是要搶鎖須要使用方自旋; 鎖不可重入,粒度是進程級別,同一個 Action,當前進程獲取鎖後,釋放後才能再次獲取鎖。

你們能夠思考一下如何完善。

相關文章
相關標籤/搜索