臥槽,頭條4面:這個問題砍了我10萬!!!

老鐵們好,這裏是java研究所。
今天咱們來探討一下分佈式鎖的4種實現:
一、經過MySQL實現分佈式鎖
二、經過redis實現分佈式鎖
三、經過zookeeper實現分佈式鎖
四、經過etcd實現分佈式鎖java

一、什麼是分佈式鎖?

如何確保共享資源在同一時刻只能被一個線程訪問?
你們可能以爲這個很簡單吧,在一個jvm中,經過synchronized或者ReentrantLock是很容易實現的。
確實,單個jvm中確實沒有問題。
可是,一般咱們的系統會採用集羣的方式部署,此時集羣中的每一個節點都是一個jvm環境,那麼經過synchronized或者ReentrantLock是沒法解決共享資源訪問的問題了。
此時就要用到分佈式鎖了:分佈式鎖就是解決分佈式環境中共享資源順序訪問的問題,同一時刻,集羣中全部節點中,只容許有一個線程能夠訪問共享資源。mysql

二、分佈式鎖的功能

分佈式鎖使用者位於不一樣的機器中,鎖獲取成功以後,才能夠對共享資源進行操做
同一時刻全部機器中只有一個使用者能夠獲取到分佈式鎖
鎖具備重入的功能:即一個使用者能夠屢次獲取某個分佈式鎖
獲取鎖的過程容許指定超時功能:在指定的時間內嘗試獲取鎖,過了超時時間,若還未獲取到鎖,則獲取失敗
防止死鎖:如:A機器獲取鎖以後,在釋放鎖以前,A機器掛了,致使鎖未釋放,結果鎖一直被A機器佔有着,遇到這種狀況時,分佈式鎖要可以自動解決;解決方式:持有鎖的時候能夠加個持有超時時間,超過了這個時間鎖將自動釋放,此時其餘機器將有機會獲取鎖
下面咱們來看一下分佈式鎖的4種實現。面試

三、方式1:數據庫的方式

3.一、原理

鎖的獲取過程
假如:一個集羣環境中有n個系統,每一個系統中有一個jvm,每一個jvm中有m個線程去獲取分佈式鎖,那麼同時可能就有n*m個線程去獲取分佈式鎖,此時分佈式鎖的壓力是比較大的,每一個jvm中多個線程同時去獲取鎖實際上是沒有意義的,能夠在每一個jvm中先加一把本地的鎖,獲取分佈式鎖以前須要先獲取jvm本地的鎖,本地鎖獲取成功以後,才能夠嘗試獲取分佈式鎖,此時n個系統中最多有n個線程嘗試獲取分佈式鎖,獲取鎖的步驟主要2步:redis

一、先嚐試獲取jvm本地鎖
二、jvm本地鎖獲取成功以後嘗試獲取分佈式鎖

超時時間

獲取鎖的時候能夠傳遞獲取鎖最大等待時間,在指定的時間內屢次嘗試獲取鎖,獲取失敗以後,休眠一會,再繼續嘗試獲取,直到時間耗盡。sql

鎖有效期

獲取鎖的時候須要指定有效期,有效期就是獲取鎖以後,使用者但願使用多長時間,爲何須要有效期?
若是沒有有效期,當使用者獲取成功以後,系統忽然down機了,那麼這個鎖就沒法釋放,其餘線程就再也沒法獲取到這個鎖了。
因此須要有有效期,超過了有效期,鎖將失效,其餘線程將能夠嘗試獲取鎖。數據庫

鎖續命

什麼是鎖續命?
好比:使用者獲取鎖的時候,指定有效期是5分鐘,可是5分鐘以後,使用者事情還未乾完,還想繼續使用一會,那麼可使用續命功能,延遲鎖的有效期。
能夠啓動一個子線程,自動完成續命的操做,好比:本來有效期是5分鐘,當使用4分鐘的時候,續命2分鐘,那麼有效期是7分鐘,這個比較簡單,你們能夠隨意發揮。服務器

3.二、準備sql

create table t_lock(
  lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '鎖惟一標誌',
  request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用來標識請求對象的',
  lock_count INT NOT NULL DEFAULT 0 COMMENT '當前上鎖次數',
  timeout BIGINT NOT NULL DEFAULT 0 COMMENT '鎖超時時間',
  version INT NOT NULL DEFAULT 0 COMMENT '版本號,每次更新+1'
)COMMENT '鎖信息表';

注意:表中有個版本號字段,版本號主要用於樂觀鎖的方式更新數據,確保併發狀況下更新數據的正確性。markdown

3.三、鎖工具類代碼

代碼比較簡單,你們主要看獲取鎖的lock方法和釋放鎖的unlock方法,註釋比較詳細,你們看看就懂了。
代碼中的重點是更新數據的時候,經過比對版本號,採用cas的方式,確保併發狀況下更新數據的正確性。
本代碼實現了獲取鎖和釋放鎖的操做,續命操做未實現,你們能夠嘗試實現一下。併發

package lock;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.sql.*;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class DbLockUtil {

    //將requestid保存在該變量中
    static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
    //jvm鎖:當多個線程併發獲取分佈式鎖時,須要先獲取jvm鎖,jvm鎖獲取成功,則嘗試獲取分佈式鎖
    static Map<String, ReentrantLock> jvmLockMap = new ConcurrentHashMap<>();

    /**
     * 獲取當前線程requestid
     *
     * @return
     */
    public static String getRequestId() {
        String requestId = requestIdTL.get();
        if (requestId == null || "".equals(requestId)) {
            requestId = UUID.randomUUID().toString();
            requestIdTL.set(requestId);
        }
        log.info("requestId:{}", requestId);
        return requestId;
    }

    /**
     * 獲取鎖
     *
     * @param lockKey         鎖key
     * @param lockTimeOut(毫秒) 持有鎖的有效時間,防止死鎖
     * @param getTimeOut(毫秒)  獲取鎖的超時時間,這個時間內獲取不到將重試
     * @return
     */
    public static boolean lock(String lockKey, long lockTimeOut, int getTimeOut) throws Exception {
        log.info("start");
        boolean lockResult = false;

        /**
         * 單個jvm中可能有多個線程併發獲取一個鎖
         * 此時咱們只容許一個線程去獲取分佈式鎖
         * 因此若是同一個jvm中有多個線程嘗試獲取分佈式鎖,須要先獲取jvm中的鎖
         */
        ReentrantLock jvmLock = new ReentrantLock();
        ReentrantLock oldJvmLock = jvmLockMap.putIfAbsent(lockKey, jvmLock);
        oldJvmLock = oldJvmLock != null ? oldJvmLock : jvmLock;
        boolean jvmLockSuccess = oldJvmLock.tryLock(getTimeOut, TimeUnit.MILLISECONDS);
        //jvm鎖獲取失敗,則直接失敗
        if (!jvmLockSuccess) {
            return lockResult;
        } else {
            //jvm鎖獲取成功,則繼續嘗試獲取分佈式鎖
            try {
                String request_id = getRequestId();
                long startTime = System.currentTimeMillis();
                //循環嘗試獲取鎖
                while (true) {
                    //經過lockKey獲取db中的記錄
                    LockModel lockModel = DbLockUtil.get(lockKey);
                    if (Objects.isNull(lockModel)) {
                        //記錄不存在,則先插入一條
                        DbLockUtil.insert(LockModel.builder().lock_key(lockKey).request_id("").lock_count(0).timeout(0L).version(0).build());
                    } else {
                        //獲取請求id,稍後請求id會放入ThreadLocal中
                        String requestId = lockModel.getRequest_id();
                        //若是requestId爲空字符,表示鎖未被佔用
                        if ("".equals(requestId)) {
                            lockModel.setRequest_id(request_id);
                            lockModel.setLock_count(1);
                            lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut);
                            //併發狀況下,採用cas方式更新記錄
                            if (DbLockUtil.update(lockModel) == 1) {
                                lockResult = true;
                                break;
                            }
                        } else if (request_id.equals(requestId)) {
                            //若是requestId和表中request_id同樣表示鎖被當前線程持有者,此時須要加劇入鎖
                            lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut);
                            lockModel.setLock_count(lockModel.getLock_count() + 1);
                            if (DbLockUtil.update(lockModel) == 1) {
                                lockResult = true;
                                break;
                            }
                        } else {
                            //鎖不是本身的,而且已經超時了,則重置鎖,繼續重試
                            if (lockModel.getTimeout() < System.currentTimeMillis()) {
                                DbLockUtil.resetLock(lockModel);
                            } else {
                                //若是未超時,休眠100毫秒,繼續重試
                                if (startTime + getTimeOut > System.currentTimeMillis()) {
                                    TimeUnit.MILLISECONDS.sleep(100);
                                } else {
                                    break;
                                }
                            }
                        }
                    }
                }
            } finally {
                //釋放jvm鎖,將其從map中異常
                jvmLock.unlock();
                jvmLockMap.remove(lockKey);
            }
        }
        log.info("end");
        return lockResult;
    }

    /**
     * 釋放鎖
     *
     * @param lock_key
     * @throws Exception
     */
    private static void unlock(String lock_key) throws Exception {
        //獲取當前線程requestId
        String requestId = getRequestId();
        LockModel lockModel = DbLockUtil.get(lock_key);
        //當前線程requestId和庫中request_id一致 && lock_count>0,表示能夠釋放鎖
        if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) {
            if (lockModel.getLock_count() == 1) {
                //重置鎖
                resetLock(lockModel);
            } else {
                lockModel.setLock_count(lockModel.getLock_count() - 1);
                DbLockUtil.update(lockModel);
            }
        }
    }

    /**
     * 重置鎖
     *
     * @param lockModel
     * @return
     * @throws Exception
     */
    private static int resetLock(LockModel lockModel) throws Exception {
        lockModel.setRequest_id("");
        lockModel.setLock_count(0);
        lockModel.setTimeout(0L);
        return DbLockUtil.update(lockModel);
    }

    /**
     * 更新lockModel信息,內部採用樂觀鎖來更新
     *
     * @param lockModel
     * @return
     * @throws Exception
     */
    private static int update(LockModel lockModel) throws Exception {
        return exec(conn -> {
            String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND  version = ?";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lockModel.getRequest_id());
            ps.setInt(colIndex++, lockModel.getLock_count());
            ps.setLong(colIndex++, lockModel.getTimeout());
            ps.setString(colIndex++, lockModel.getLock_key());
            ps.setInt(colIndex++, lockModel.getVersion());
            return ps.executeUpdate();
        });
    }

    private static LockModel get(String lock_key) throws Exception {
        return exec(conn -> {
            String sql = "select * from t_lock t WHERE t.lock_key=?";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lock_key);
            ResultSet rs = ps.executeQuery();
            if (rs.next()) {
                return LockModel.builder().
                        lock_key(lock_key).
                        request_id(rs.getString("request_id")).
                        lock_count(rs.getInt("lock_count")).
                        timeout(rs.getLong("timeout")).
                        version(rs.getInt("version")).build();
            }
            return null;
        });
    }

    private static int insert(LockModel lockModel) throws Exception {
        return exec(conn -> {
            String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)";
            PreparedStatement ps = conn.prepareStatement(sql);
            int colIndex = 1;
            ps.setString(colIndex++, lockModel.getLock_key());
            ps.setString(colIndex++, lockModel.getRequest_id());
            ps.setInt(colIndex++, lockModel.getLock_count());
            ps.setLong(colIndex++, lockModel.getTimeout());
            ps.setInt(colIndex++, lockModel.getVersion());
            return ps.executeUpdate();
        });
    }

    private static <T> T exec(SqlExec<T> sqlExec) throws Exception {
        Connection conn = getConn();
        try {
            return sqlExec.exec(conn);
        } finally {
            closeConn(conn);
        }
    }

    @FunctionalInterface
    public interface SqlExec<T> {
        T exec(Connection conn) throws Exception;
    }

    @Getter
    @Setter
    @Builder
    public static class LockModel {
        private String lock_key;
        private String request_id;
        private Integer lock_count;
        private Long timeout;
        private Integer version;
    }

    private static final String url = "jdbc:mysql://localhost:3306/dlock?useSSL=false";        //數據庫地址
    private static final String username = "";        //數據庫用戶名
    private static final String password = "";        //數據庫密碼
    private static final String driver = "com.mysql.jdbc.Driver";        //mysql驅動

    /**
     * 鏈接數據庫
     *
     * @return
     */
    private static Connection getConn() {
        Connection conn = null;
        try {
            Class.forName(driver);  //加載數據庫驅動
            try {
                conn = DriverManager.getConnection(url, username, password);  //鏈接數據庫
            } catch (SQLException e) {
                e.printStackTrace();
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * 關閉數據庫連接
     *
     * @return
     */
    private static void closeConn(Connection conn) {
        if (conn != null) {
            try {
                conn.close();  //關閉數據庫連接
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

四、方式2:redis 的方式

4.一、用到的幾個命令

  1. setnx
    命令格式:SETNX key value;是『SET if Not eXists』(若是不存在,則 SET)的簡寫,只在鍵 key 不存在的狀況下,將鍵 key 的值設置爲 value 。若鍵 key 已經存在, 則 SETNX 命令不作任何動做。命令在設置成功時返回 1 ,設置失敗時返回 0 。dom

  2. getset
    命令格式:GETSET key value,將鍵 key 的值設爲 value ,並返回鍵 key 在被設置以前的舊的value。返回值:若是鍵 key 沒有舊值, 也便是說, 鍵 key 在被設置以前並不存在, 那麼命令返回 nil 。當鍵 key 存在但不是字符串類型時,命令返回一個錯誤。

  3. expire
    命令格式:EXPIRE key seconds,使用:爲給定 key 設置生存時間,當 key 過時時(生存時間爲 0 ),它會被自動刪除。返回值:設置成功返回 1 。當 key 不存在或者不能爲 key 設置生存時間時(好比在低於 2.1.3 版本的 Redis 中你嘗試更新 key 的生存時間),返回 0 。
  • del
    命令格式:DEL key [key …],使用:刪除給定的一個或多個 key ,不存在的 key 會被忽略。返回值:被刪除 key 的數量。

4.二、原理

臥槽,頭條4面:這個問題砍了我10萬!!!

過程分析,先看圖左邊過程:

一、A嘗試去獲取鎖lockkey,經過setnx(lockkey,currenttime+timeout)命令,對lockkey進行setnx,將value值設置爲當前時間+鎖超時時間;
二、若是返回值爲1,說明redis服務器中尚未lockkey,也就是沒有其餘用戶擁有這個鎖,A就能獲取鎖成功;
三、在進行相關業務執行以前,先執行expire(lockkey),對lockkey設置有效期,防止死鎖;由於若是不設置有效期的話,lockkey將一直存在於redis中,其餘用戶嘗試獲取鎖時,執行到setnx(lockkey,currenttime+timeout)時,將不能成功獲取到該鎖;
四、執行相關業務
五、釋放鎖,A完成相關業務以後,要釋放擁有的鎖,也就是刪除redis中該鎖的內容,del(lockkey),接下來的用戶才能進行從新設置鎖新值

再看右邊過程

六、當A經過setnx(lockkey,currenttime+timeout)命令不能成功設置lockkey時,這是不能直接判定獲取鎖失敗;由於咱們在設置鎖時,設置了鎖的超時時間timeout,噹噹前時間大於redis中存儲鍵值爲lockkey的value值時,能夠認爲上一任的擁有者對鎖的使用權已經失效了,A就能夠強行擁有該鎖;具體斷定過程以下;
七、A經過get(lockkey),獲取redis中的存儲鍵值爲lockkey的value值,即獲取鎖的相對時間lockvalueA
八、lockvalueA!=null && currenttime>lockvalue,A經過當前的時間與鎖設置的時間作比較,若是當前時間已經大於鎖設置的時間臨界,便可以進一步判斷是否能夠獲取鎖,不然說明該鎖還在被佔用,A就還不能獲取該鎖,結束,獲取鎖失敗;
九、步驟4返回結果爲true後,經過getSet設置新的超時時間,並返回舊值lockvalueB,以做判斷,由於在分佈式環境,在進入這裏時可能另外的進程獲取到鎖並對值進行了修改,只有舊值與返回的值一致才能說明中間未被其餘進程獲取到這個鎖
十、lockvalueB == null || lockvalueA==lockvalueB,判斷:若果lockvalueB爲null,說明該鎖已經被釋放了,此時該進程能夠獲取鎖;舊值與返回的lockvalueB一致說明中間未被其餘進程獲取該鎖,能夠獲取鎖;不然不能獲取鎖,結束,獲取鎖失敗。

4.三、代碼

留給給你們,按照上面的過程實現下。

五、方式3:zookeeper

5.一、原理

zookeeper是什麼?是一個開源的中間件,能夠作高可用配置中心使用,簡單點理解:能夠用來保存用戶的一些數據。
zookeeper有3個特色比較重要,這2個特色是實現分佈式鎖的關
鍵。

第1個特色:節點自然有序

zookeeper中存儲數據是樹結構,樹下面能夠建立不少節點,節點中能夠存儲用戶的數據。
在每個節點下面建立子節點時,只要選擇的建立類型是有序類型,那麼,此節點將自動在客戶端指定的節點名後面添加一個單調遞增序號,重點是,併發建立子節點的狀況下,也能夠確保多個子節點的有序性。
好比並發在/lock/lock1下面建立4個有序子節點,以下:
臥槽,頭條4面:這個問題砍了我10萬!!!
客戶端能夠判斷建立的節點序號是否是最小的,若是編號是子節點中最小的,則獲取鎖成功。

第2個特色:臨時節點

客戶端操做zookeeper,須要和zookeeper之間創建鏈接,若是客戶端請求在zookeeper上建立的節點類型是臨時節點,那麼當客戶端和zookeeper之間鏈接斷開的時候,建立的臨時節點自動會被zookeeper刪除。
這個能夠防止死鎖多功能,好比客戶端獲取鎖以後掛了,那麼節點會自動被刪除,此時鎖的其餘獲取者纔有機會獲取鎖。

第3個特色:監聽器

客戶端能夠對某個節點添加監聽器,當節點信息發生變化的時候,zookeeper會通知客戶端,好比節點數據被修改、節點被刪除等等,都會通知客戶端;
這個特性特別牛逼:這個特別爽,後面的節點只須要監聽他前面的一個節點,當前面的一個節點被刪除時,zookeeper會通知監聽者,監聽者能夠判斷本身建立的節點編號是否是最小的,若是是最小的,即獲取鎖成功,這個是否是比上面數據庫和redis的方式好一些,db和redis的方式須要自旋(獲取失敗了,休眠稍許,繼續循環嘗試),而zookeeper不須要自旋,鎖被釋放的時候,zookeeper會通知等待者。

5.二、代碼

重點理解原理,代碼你們能夠在網上找找,比較多,這裏就不貼出來了。

六、方式4:etcd

etcd 和 zookeeper功能差很少,也能夠做爲高可用配置中心,不過etcd基於Go語言實現,也能夠用來實現分佈式鎖,實現原理上和zookeeper差很少,這裏就不細說了。

七、總結

本文主要介紹了4種方式實現分佈式鎖,你們重點要理解每種方式的原理。
db和redis的方式原理差很少,內部在獲取失敗的狀況下,都須要採用自旋的方式從新嘗試獲取鎖,而zookeeper採用監聽的方式。
redis和zookeeper這2種方式用的比較多,性能上面redis更好一些,併發量比較大的能夠採用redis的方式。
設計中還有一點:獲取鎖的時候分2步走,先獲取jvm中的鎖,而後在嘗試獲取分佈式鎖。

八、更多面試題

  1. B站上有哪些值得推薦學習的視頻?
  2. 經典面試題:重寫equals方法時,爲何必須重寫hashCode方法?
  3. 經典面試題:HashMap的默認容量爲何是16 ?
  4. 經典面試題:Arraylist和Linkedlist到底有什麼區別???
  5. 經典面試題:NoClassDefFoundError 和 ClassNotFoundException 有什麼區別?
  6. 經典面試題:Throwable、Exception、Error、RuntimeException到底有什麼區別?
  7. 經典面試題:try、finally中都有return時,代碼如何執行????
  8. 面對億級數據,MySQL到底行不行,一塊兒來看看!!
  9. 經典面試題:ThreadLocal連環炮!!
  10. 經典面試題:強引用、軟引用、弱引用、虛引用有什麼區別?
  11. 面試官:線程有幾種狀態?他們是如何相互轉換的?
    臥槽,頭條4面:這個問題砍了我10萬!!!

·END·
臥槽,頭條4面:這個問題砍了我10萬!!!掃描二維碼 | 關注咱們

相關文章
相關標籤/搜索