老鐵們好,這裏是java研究所。
今天咱們來探討一下分佈式鎖的4種實現:
一、經過MySQL實現分佈式鎖
二、經過redis實現分佈式鎖
三、經過zookeeper實現分佈式鎖
四、經過etcd實現分佈式鎖java
如何確保共享資源在同一時刻只能被一個線程訪問?
你們可能以爲這個很簡單吧,在一個jvm中,經過synchronized或者ReentrantLock是很容易實現的。
確實,單個jvm中確實沒有問題。
可是,一般咱們的系統會採用集羣的方式部署,此時集羣中的每一個節點都是一個jvm環境,那麼經過synchronized或者ReentrantLock是沒法解決共享資源訪問的問題了。
此時就要用到分佈式鎖了:分佈式鎖就是解決分佈式環境中共享資源順序訪問的問題,同一時刻,集羣中全部節點中,只容許有一個線程能夠訪問共享資源。mysql
分佈式鎖使用者位於不一樣的機器中,鎖獲取成功以後,才能夠對共享資源進行操做
同一時刻全部機器中只有一個使用者能夠獲取到分佈式鎖
鎖具備重入的功能:即一個使用者能夠屢次獲取某個分佈式鎖
獲取鎖的過程容許指定超時功能:在指定的時間內嘗試獲取鎖,過了超時時間,若還未獲取到鎖,則獲取失敗
防止死鎖:如:A機器獲取鎖以後,在釋放鎖以前,A機器掛了,致使鎖未釋放,結果鎖一直被A機器佔有着,遇到這種狀況時,分佈式鎖要可以自動解決;解決方式:持有鎖的時候能夠加個持有超時時間,超過了這個時間鎖將自動釋放,此時其餘機器將有機會獲取鎖
下面咱們來看一下分佈式鎖的4種實現。面試
鎖的獲取過程
假如:一個集羣環境中有n個系統,每一個系統中有一個jvm,每一個jvm中有m個線程去獲取分佈式鎖,那麼同時可能就有n*m個線程去獲取分佈式鎖,此時分佈式鎖的壓力是比較大的,每一個jvm中多個線程同時去獲取鎖實際上是沒有意義的,能夠在每一個jvm中先加一把本地的鎖,獲取分佈式鎖以前須要先獲取jvm本地的鎖,本地鎖獲取成功以後,才能夠嘗試獲取分佈式鎖,此時n個系統中最多有n個線程嘗試獲取分佈式鎖,獲取鎖的步驟主要2步:redis
一、先嚐試獲取jvm本地鎖 二、jvm本地鎖獲取成功以後嘗試獲取分佈式鎖
獲取鎖的時候能夠傳遞獲取鎖最大等待時間,在指定的時間內屢次嘗試獲取鎖,獲取失敗以後,休眠一會,再繼續嘗試獲取,直到時間耗盡。sql
獲取鎖的時候須要指定有效期,有效期就是獲取鎖以後,使用者但願使用多長時間,爲何須要有效期?
若是沒有有效期,當使用者獲取成功以後,系統忽然down機了,那麼這個鎖就沒法釋放,其餘線程就再也沒法獲取到這個鎖了。
因此須要有有效期,超過了有效期,鎖將失效,其餘線程將能夠嘗試獲取鎖。數據庫
什麼是鎖續命?
好比:使用者獲取鎖的時候,指定有效期是5分鐘,可是5分鐘以後,使用者事情還未乾完,還想繼續使用一會,那麼可使用續命功能,延遲鎖的有效期。
能夠啓動一個子線程,自動完成續命的操做,好比:本來有效期是5分鐘,當使用4分鐘的時候,續命2分鐘,那麼有效期是7分鐘,這個比較簡單,你們能夠隨意發揮。服務器
create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖惟一標誌', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標識請求對象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '當前上鎖次數', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間', version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1' )COMMENT '鎖信息表';
注意:表中有個版本號字段,版本號主要用於樂觀鎖的方式更新數據,確保併發狀況下更新數據的正確性。markdown
代碼比較簡單,你們主要看獲取鎖的lock方法和釋放鎖的unlock方法,註釋比較詳細,你們看看就懂了。
代碼中的重點是更新數據的時候,經過比對版本號,採用cas的方式,確保併發狀況下更新數據的正確性。
本代碼實現了獲取鎖和釋放鎖的操做,續命操做未實現,你們能夠嘗試實現一下。併發
package lock; import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.sql.*; import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class DbLockUtil { //將requestid保存在該變量中 static ThreadLocal<String> requestIdTL = new ThreadLocal<>(); //jvm鎖:當多個線程併發獲取分佈式鎖時,須要先獲取jvm鎖,jvm鎖獲取成功,則嘗試獲取分佈式鎖 static Map<String, ReentrantLock> jvmLockMap = new ConcurrentHashMap<>(); /** * 獲取當前線程requestid * * @return */ public static String getRequestId() { String requestId = requestIdTL.get(); if (requestId == null || "".equals(requestId)) { requestId = UUID.randomUUID().toString(); requestIdTL.set(requestId); } log.info("requestId:{}", requestId); return requestId; } /** * 獲取鎖 * * @param lockKey 鎖key * @param lockTimeOut(毫秒) 持有鎖的有效時間,防止死鎖 * @param getTimeOut(毫秒) 獲取鎖的超時時間,這個時間內獲取不到將重試 * @return */ public static boolean lock(String lockKey, long lockTimeOut, int getTimeOut) throws Exception { log.info("start"); boolean lockResult = false; /** * 單個jvm中可能有多個線程併發獲取一個鎖 * 此時咱們只容許一個線程去獲取分佈式鎖 * 因此若是同一個jvm中有多個線程嘗試獲取分佈式鎖,須要先獲取jvm中的鎖 */ ReentrantLock jvmLock = new ReentrantLock(); ReentrantLock oldJvmLock = jvmLockMap.putIfAbsent(lockKey, jvmLock); oldJvmLock = oldJvmLock != null ? oldJvmLock : jvmLock; boolean jvmLockSuccess = oldJvmLock.tryLock(getTimeOut, TimeUnit.MILLISECONDS); //jvm鎖獲取失敗,則直接失敗 if (!jvmLockSuccess) { return lockResult; } else { //jvm鎖獲取成功,則繼續嘗試獲取分佈式鎖 try { String request_id = getRequestId(); long startTime = System.currentTimeMillis(); //循環嘗試獲取鎖 while (true) { //經過lockKey獲取db中的記錄 LockModel lockModel = DbLockUtil.get(lockKey); if (Objects.isNull(lockModel)) { //記錄不存在,則先插入一條 DbLockUtil.insert(LockModel.builder().lock_key(lockKey).request_id("").lock_count(0).timeout(0L).version(0).build()); } else { //獲取請求id,稍後請求id會放入ThreadLocal中 String requestId = lockModel.getRequest_id(); //若是requestId爲空字符,表示鎖未被佔用 if ("".equals(requestId)) { lockModel.setRequest_id(request_id); lockModel.setLock_count(1); lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut); //併發狀況下,採用cas方式更新記錄 if (DbLockUtil.update(lockModel) == 1) { lockResult = true; break; } } else if (request_id.equals(requestId)) { //若是requestId和表中request_id同樣表示鎖被當前線程持有者,此時須要加劇入鎖 lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut); lockModel.setLock_count(lockModel.getLock_count() + 1); if (DbLockUtil.update(lockModel) == 1) { lockResult = true; break; } } else { //鎖不是本身的,而且已經超時了,則重置鎖,繼續重試 if (lockModel.getTimeout() < System.currentTimeMillis()) { DbLockUtil.resetLock(lockModel); } else { //若是未超時,休眠100毫秒,繼續重試 if (startTime + getTimeOut > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } else { break; } } } } } } finally { //釋放jvm鎖,將其從map中異常 jvmLock.unlock(); jvmLockMap.remove(lockKey); } } log.info("end"); return lockResult; } /** * 釋放鎖 * * @param lock_key * @throws Exception */ private static void unlock(String lock_key) throws Exception { //獲取當前線程requestId String requestId = getRequestId(); LockModel lockModel = DbLockUtil.get(lock_key); //當前線程requestId和庫中request_id一致 && lock_count>0,表示能夠釋放鎖 if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) { if (lockModel.getLock_count() == 1) { //重置鎖 resetLock(lockModel); } else { lockModel.setLock_count(lockModel.getLock_count() - 1); DbLockUtil.update(lockModel); } } } /** * 重置鎖 * * @param lockModel * @return * @throws Exception */ private static int resetLock(LockModel lockModel) throws Exception { lockModel.setRequest_id(""); lockModel.setLock_count(0); lockModel.setTimeout(0L); return DbLockUtil.update(lockModel); } /** * 更新lockModel信息,內部採用樂觀鎖來更新 * * @param lockModel * @return * @throws Exception */ private static int update(LockModel lockModel) throws Exception { return exec(conn -> { String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND version = ?"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lockModel.getRequest_id()); ps.setInt(colIndex++, lockModel.getLock_count()); ps.setLong(colIndex++, lockModel.getTimeout()); ps.setString(colIndex++, lockModel.getLock_key()); ps.setInt(colIndex++, lockModel.getVersion()); return ps.executeUpdate(); }); } private static LockModel get(String lock_key) throws Exception { return exec(conn -> { String sql = "select * from t_lock t WHERE t.lock_key=?"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lock_key); ResultSet rs = ps.executeQuery(); if (rs.next()) { return LockModel.builder(). lock_key(lock_key). request_id(rs.getString("request_id")). lock_count(rs.getInt("lock_count")). timeout(rs.getLong("timeout")). version(rs.getInt("version")).build(); } return null; }); } private static int insert(LockModel lockModel) throws Exception { return exec(conn -> { String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lockModel.getLock_key()); ps.setString(colIndex++, lockModel.getRequest_id()); ps.setInt(colIndex++, lockModel.getLock_count()); ps.setLong(colIndex++, lockModel.getTimeout()); ps.setInt(colIndex++, lockModel.getVersion()); return ps.executeUpdate(); }); } private static <T> T exec(SqlExec<T> sqlExec) throws Exception { Connection conn = getConn(); try { return sqlExec.exec(conn); } finally { closeConn(conn); } } @FunctionalInterface public interface SqlExec<T> { T exec(Connection conn) throws Exception; } @Getter @Setter @Builder public static class LockModel { private String lock_key; private String request_id; private Integer lock_count; private Long timeout; private Integer version; } private static final String url = "jdbc:mysql://localhost:3306/dlock?useSSL=false"; //數據庫地址 private static final String username = ""; //數據庫用戶名 private static final String password = ""; //數據庫密碼 private static final String driver = "com.mysql.jdbc.Driver"; //mysql驅動 /** * 鏈接數據庫 * * @return */ private static Connection getConn() { Connection conn = null; try { Class.forName(driver); //加載數據庫驅動 try { conn = DriverManager.getConnection(url, username, password); //鏈接數據庫 } catch (SQLException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { e.printStackTrace(); } return conn; } /** * 關閉數據庫連接 * * @return */ private static void closeConn(Connection conn) { if (conn != null) { try { conn.close(); //關閉數據庫連接 } catch (SQLException e) { e.printStackTrace(); } } } }
setnx
命令格式:SETNX key value;是『SET if Not eXists』(若是不存在,則 SET)的簡寫,只在鍵 key 不存在的狀況下,將鍵 key 的值設置爲 value 。若鍵 key 已經存在, 則 SETNX 命令不作任何動做。命令在設置成功時返回 1 ,設置失敗時返回 0 。dom
getset
命令格式:GETSET key value,將鍵 key 的值設爲 value ,並返回鍵 key 在被設置以前的舊的value。返回值:若是鍵 key 沒有舊值, 也便是說, 鍵 key 在被設置以前並不存在, 那麼命令返回 nil 。當鍵 key 存在但不是字符串類型時,命令返回一個錯誤。
一、A嘗試去獲取鎖lockkey,經過setnx(lockkey,currenttime+timeout)命令,對lockkey進行setnx,將value值設置爲當前時間+鎖超時時間;
二、若是返回值爲1,說明redis服務器中尚未lockkey,也就是沒有其餘用戶擁有這個鎖,A就能獲取鎖成功;
三、在進行相關業務執行以前,先執行expire(lockkey),對lockkey設置有效期,防止死鎖;由於若是不設置有效期的話,lockkey將一直存在於redis中,其餘用戶嘗試獲取鎖時,執行到setnx(lockkey,currenttime+timeout)時,將不能成功獲取到該鎖;
四、執行相關業務
五、釋放鎖,A完成相關業務以後,要釋放擁有的鎖,也就是刪除redis中該鎖的內容,del(lockkey),接下來的用戶才能進行從新設置鎖新值
六、當A經過setnx(lockkey,currenttime+timeout)命令不能成功設置lockkey時,這是不能直接判定獲取鎖失敗;由於咱們在設置鎖時,設置了鎖的超時時間timeout,噹噹前時間大於redis中存儲鍵值爲lockkey的value值時,能夠認爲上一任的擁有者對鎖的使用權已經失效了,A就能夠強行擁有該鎖;具體斷定過程以下;
七、A經過get(lockkey),獲取redis中的存儲鍵值爲lockkey的value值,即獲取鎖的相對時間lockvalueA
八、lockvalueA!=null && currenttime>lockvalue,A經過當前的時間與鎖設置的時間作比較,若是當前時間已經大於鎖設置的時間臨界,便可以進一步判斷是否能夠獲取鎖,不然說明該鎖還在被佔用,A就還不能獲取該鎖,結束,獲取鎖失敗;
九、步驟4返回結果爲true後,經過getSet設置新的超時時間,並返回舊值lockvalueB,以做判斷,由於在分佈式環境,在進入這裏時可能另外的進程獲取到鎖並對值進行了修改,只有舊值與返回的值一致才能說明中間未被其餘進程獲取到這個鎖
十、lockvalueB == null || lockvalueA==lockvalueB,判斷:若果lockvalueB爲null,說明該鎖已經被釋放了,此時該進程能夠獲取鎖;舊值與返回的lockvalueB一致說明中間未被其餘進程獲取該鎖,能夠獲取鎖;不然不能獲取鎖,結束,獲取鎖失敗。
留給給你們,按照上面的過程實現下。
zookeeper是什麼?是一個開源的中間件,能夠作高可用配置中心使用,簡單點理解:能夠用來保存用戶的一些數據。
zookeeper有3個特色比較重要,這2個特色是實現分佈式鎖的關
鍵。
zookeeper中存儲數據是樹結構,樹下面能夠建立不少節點,節點中能夠存儲用戶的數據。
在每個節點下面建立子節點時,只要選擇的建立類型是有序類型,那麼,此節點將自動在客戶端指定的節點名後面添加一個單調遞增序號,重點是,併發建立子節點的狀況下,也能夠確保多個子節點的有序性。
好比並發在/lock/lock1下面建立4個有序子節點,以下:
客戶端能夠判斷建立的節點序號是否是最小的,若是編號是子節點中最小的,則獲取鎖成功。
客戶端操做zookeeper,須要和zookeeper之間創建鏈接,若是客戶端請求在zookeeper上建立的節點類型是臨時節點,那麼當客戶端和zookeeper之間鏈接斷開的時候,建立的臨時節點自動會被zookeeper刪除。
這個能夠防止死鎖多功能,好比客戶端獲取鎖以後掛了,那麼節點會自動被刪除,此時鎖的其餘獲取者纔有機會獲取鎖。
客戶端能夠對某個節點添加監聽器,當節點信息發生變化的時候,zookeeper會通知客戶端,好比節點數據被修改、節點被刪除等等,都會通知客戶端;
這個特性特別牛逼:這個特別爽,後面的節點只須要監聽他前面的一個節點,當前面的一個節點被刪除時,zookeeper會通知監聽者,監聽者能夠判斷本身建立的節點編號是否是最小的,若是是最小的,即獲取鎖成功,這個是否是比上面數據庫和redis的方式好一些,db和redis的方式須要自旋(獲取失敗了,休眠稍許,繼續循環嘗試),而zookeeper不須要自旋,鎖被釋放的時候,zookeeper會通知等待者。
重點理解原理,代碼你們能夠在網上找找,比較多,這裏就不貼出來了。
etcd 和 zookeeper功能差很少,也能夠做爲高可用配置中心,不過etcd基於Go語言實現,也能夠用來實現分佈式鎖,實現原理上和zookeeper差很少,這裏就不細說了。
本文主要介紹了4種方式實現分佈式鎖,你們重點要理解每種方式的原理。
db和redis的方式原理差很少,內部在獲取失敗的狀況下,都須要採用自旋的方式從新嘗試獲取鎖,而zookeeper採用監聽的方式。
redis和zookeeper這2種方式用的比較多,性能上面redis更好一些,併發量比較大的能夠採用redis的方式。
設計中還有一點:獲取鎖的時候分2步走,先獲取jvm中的鎖,而後在嘗試獲取分佈式鎖。
·END·
掃描二維碼 | 關注咱們