本文主要研究一下machinery的Lockgit
type Lock interface { //Acquire the lock with retry //key: the name of the lock, //value: at the nanosecond timestamp that lock needs to be released automatically LockWithRetries(key string, value int64) error //Acquire the lock with once //key: the name of the lock, //value: at the nanosecond timestamp that lock needs to be released automatically Lock(key string, value int64) error }
Lock接口定義了LockWithRetries、Lock方法
var ( ErrRedisLockFailed = errors.New("redis lock: failed to acquire lock") ) type Lock struct { rclient redis.UniversalClient retries int interval time.Duration } func New(cnf *config.Config, addrs []string, db, retries int) Lock { if retries <= 0 { return Lock{} } lock := Lock{retries: retries} var password string parts := strings.Split(addrs[0], "@") if len(parts) == 2 { password = parts[0] addrs[0] = parts[1] } ropt := &redis.UniversalOptions{ Addrs: addrs, DB: db, Password: password, } if cnf.Redis != nil { ropt.MasterName = cnf.Redis.MasterName } lock.rclient = redis.NewUniversalClient(ropt) return lock } func (r Lock) LockWithRetries(key string, unixTsToExpireNs int64) error { for i := 0; i <= r.retries; i++ { err := r.Lock(key, unixTsToExpireNs) if err == nil { //成功拿到鎖,返回 return nil } time.Sleep(r.interval) } return ErrRedisLockFailed } func (r Lock) Lock(key string, unixTsToExpireNs int64) error { now := time.Now().UnixNano() expiration := time.Duration(unixTsToExpireNs + 1 - now) ctx := r.rclient.Context() success, err := r.rclient.SetNX(ctx, key, unixTsToExpireNs, expiration).Result() if err != nil { return err } if !success { v, err := r.rclient.Get(ctx, key).Result() if err != nil { return err } timeout, err := strconv.Atoi(v) if err != nil { return err } if timeout != 0 && now > int64(timeout) { newTimeout, err := r.rclient.GetSet(ctx, key, unixTsToExpireNs).Result() if err != nil { return err } curTimeout, err := strconv.Atoi(newTimeout) if err != nil { return err } if now > int64(curTimeout) { // success to acquire lock with get set // set the expiration of redis key r.rclient.Expire(ctx, key, expiration) return nil } return ErrRedisLockFailed } return ErrRedisLockFailed } return nil }
Lock定義了rclient、retries、interval屬性;New方法根據cnf、addrs、db、retries建立lock;LockWithRetries方法根據retries次數來嘗試r.Lock(key, unixTsToExpireNs),都沒有成功則返回ErrRedisLockFailed;Lock方法執行r.rclient.SetNX,若是不成功則判斷是否過時,過時的話執行執行r.rclient.GetSet,若確實過時了則執行r.rclient.Expire(ctx, key, expiration)更新新的過時時間
machinery的Lock接口定義了LockWithRetries、Lock方法;基於redis的實現則經過r.rclient.SetNX、r.rclient.GetSet、r.rclient.Expire實現。github