本文主要研究一下storagetapper的Lockhtml
storagetapper/lock/lock.gomysql
/*Lock is general distributed lock interface*/ type Lock interface { // Try to acquire a lock. Returns false if failed. TryLock(s string) bool // Try to acquire a lock. Returns false if failed. // Allows n simultaneous locks to be held TryLockShared(s string, n int) bool // Try to acquire a lock and wait for specified period of time for the lock // to become available. Returns false if failed. Lock(s string, waitDuration time.Duration) bool // Check if we still have the lock. Try to reacquire if necessary. // Returns false in the case of failure Refresh() bool // Unlock the lock. Returns false if there was failure Unlock() bool //Close releases resources associated with the lock Close() bool }
Lock接口定義了TryLock、TryLockShared、Lock、Refresh、Unlock、Close方法
storagetapper/lock/lock.gogit
type myLock struct { conn *sql.DB connID int64 name string ci db.Addr n int mu sync.Mutex isLocked bool } // Lock waits for the duration specified for the lock to be available // Also TryLock will reuse the connection if it already exists otherwise // it will create a new one. func (m *myLock) Lock(s string, timeout time.Duration) bool { return m.lock(s, timeout, 1) } // TryLock tries to take a lock and returns an error if it cannot. If the lock // is already held then TryLock is noop. func (m *myLock) TryLock(s string) bool { return m.Lock(s, 0) } // TryLockShared tries to take a lock and returns an error if it cannot. If the lock // is already held then TryLock is noop. func (m *myLock) TryLockShared(s string, n int) bool { return m.lock(s, 0, n) } // Refresh tries to keep the lock fresh. func (m *myLock) Refresh() bool { if !m.isLocked { return true } if m.IsLockedByMe() { return true } return m.TryLock(m.name) } // Unlock releases locks associated with the connection func (m *myLock) Unlock() bool { m.mu.Lock() defer m.mu.Unlock() if !m.isLocked { return false } m.isLocked = false if m.conn == nil { return false } var res sql.NullBool err := m.conn.QueryRow("SELECT RELEASE_LOCK(?)", m.lockName()).Scan(&res) if log.EL(m.log(), err) { return false } if !res.Valid { m.log().Errorf("Lock did not exists on release") return false } if !res.Bool { m.log().Errorf("Lock was not hold by me") return false } return true } func (m *myLock) Close() bool { return m.closeConn() }
myLock定義了conn、connID、name、db.Addr、n、mu、isLocked屬性,它使用db實現了Lock接口;其Lock、TryLock、TryLockShared內部調用的是lock方法;Refresh方法先判斷是否還處於加鎖狀態,若是不是在判斷是不是本身加鎖的,若是不是則執行TryLock;Unlock經過RELEASE_LOCK來實現,另外還會更新isLocked爲false;Close則會釋放conn
storagetapper/lock/lock.gogithub
// Lock waits for the duration specified for the lock to be available // Also TryLock will reuse the connection if it already exists otherwise // it will create a new one. func (m *myLock) lock(s string, timeout time.Duration, ntickets int) bool { m.mu.Lock() defer m.mu.Unlock() var err error var res sql.NullBool m.name = s for m.n = 0; m.n < ntickets; m.n++ { if !m.createConn() { return false } err = m.conn.QueryRow("SELECT GET_LOCK(?,?)", m.lockName(), timeout/time.Second).Scan(&res) //true - success, false - timeout, NULL - error if err == nil && res.Valid && res.Bool { m.isLocked = true return true } if log.EL(m.log(), err) { m.closeConn() } } return false }
lock方法執行GET_LOCK,若是加鎖成功則設置isLocked爲true,不然繼續重試,重試ntickets次
storagetapper/lock/lock.gosql
// IsLockedByMe checks whether the lock is held by the same connection as // the current lock object. func (m *myLock) IsLockedByMe() bool { m.mu.Lock() defer m.mu.Unlock() var lockedBy int64 if m.conn == nil { return false } err := m.conn.QueryRow("SELECT IFNULL(IS_USED_LOCK(?), 0)", m.lockName()).Scan(&lockedBy) log.Debugf("lock: lockedBy: %v, myConnID: %v", lockedBy, m.connID) if err != nil || m.connID == 0 || lockedBy != m.connID { if err != nil { m.log().Errorf("IsLockedByMe: error: " + err.Error()) m.closeConn() } else { m.log().Debugf("IsLockedByMe: lockedBy: %v, != myConnID: %v", lockedBy, m.connID) } return false } return true }
IsLockedByMe經過IS_USED_LOCK來查詢lockedBy,以後判斷lockedBy是否與connID相等,不等則返回false
storagetapper的Lock接口定義了TryLock、TryLockShared、Lock、Refresh、Unlock、Close方法;myLock定義了conn、connID、name、db.Addr、n、mu、isLocked屬性,它使用db實現了Lock接口,它藉助了mysql的GET_LOCK、RELEASE_LOCK、IS_USED_LOCK函數來實現。app