從秒殺聊到ZooKeeper分佈式鎖

思惟導圖

文章已收錄到Github精選,歡迎Star
https://github.com/yehongzhi/learningSummary

前言

通過《ZooKeeper入門》後,咱們學會了ZooKeeper的基本用法。java

實際上ZooKeeper的應用是很是普遍的,實現分佈式鎖只是其中一種。接下來咱們就ZooKeeper實現分佈式鎖解決秒殺超賣問題進行展開。mysql

1、什麼是秒殺超賣問題

秒殺活動應該都不陌生,不用過多解釋。git

不難想象,在這種"秒殺"的場景中,實際上會出現多個用戶爭搶"資源"的狀況,也就是多個線程同時併發,這種狀況是很容易出現數據不許確,也就是超賣問題github

1.1 項目演示

下面使用程序演示,我使用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一個簡單的項目,github地址:redis

https://github.com/yehongzhi/...

建立一個商品信息表:sql

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名稱',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品價格',
  `number` int(10) DEFAULT '0' COMMENT '商品數量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

添加一個商品[叉燒包]進去:
數據庫

核心的代碼邏輯是這樣的:apache

@Override
    public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        //1.先查詢數據庫中商品的數量
        TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
        //2.判斷商品數量是否大於0,或者購買的數量大於庫存
        Integer count = commodityInfo.getNumber();
        if (count <= 0 || number > count) {
            //商品數量小於或者等於0,或者購買的數量大於庫存,則返回false
            return false;
        }
        //3.若是庫存數量大於0,而且購買的數量小於或者等於庫存。則更新商品數量
        count -= number;
        commodityInfo.setNumber(count);
        boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
        if (bool) {
            //若是更新成功,則打印購買商品成功
            System.out.println("購買商品[ " + commodityInfo.getCommodityName() + " ]成功,數量爲:" + number);
        }
        return bool;
    }

邏輯示意圖以下:服務器

上面這個邏輯,若是單線程請求的話是沒有問題的。多線程

可是多線程的話就出現問題了。如今我就建立多個線程,經過HttpClient進行請求,看會發生什麼:

public static void main(String[] args) throws Exception {
        //請求地址
        String url = "http://localhost:8080/mall/commodity/purchase";
        //請求參數,商品ID,數量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        //建立10個線程經過HttpClient進行發送請求,測試
        for (int i = 0; i < 10; i++) {
            //這個線程的邏輯僅僅是發送請求
            CommodityThread commodityThread = new CommodityThread(url, map);
            commodityThread.start();
        }
    }

說明一下,叉燒包的數量是100,這裏有10個線程同時去購買,假設都購買成功的話,庫存數量應該是90。

實際上,10個線程的確都購買成功了:

可是數據庫的商品庫存,卻不許確:

2、嘗試使用本地鎖

上面的場景,大概流程以下所示:

能夠看出問題的關鍵在於兩個線程"同時"去查詢剩餘的庫存,而後更新庫存致使的。要解決這個問題,其實只要保證多個線程在這段邏輯是順序執行便可,也就是加鎖

本地鎖JDK提供有兩種:synchronized和Lock鎖。

兩種方式均可以,我這裏爲了簡便,使用synchronized:

//使用synchronized修飾方法
    @Override
    public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        //省略...
    }

而後再測試剛剛多線程併發搶購的狀況,看看結果:

問題獲得解決!!!

你覺得事情就這樣結束了嗎,看了看進度條,發現事情並不簡單。

咱們知道在實際項目中,每每不會只部署一臺服務器,因此不妨咱們啓動兩臺服務器,端口號分別是8080、8081,模擬實際項目的場景:

寫一個交替請求的測試腳本,模擬多臺服務器分別處理請求,用戶秒殺搶購的場景:

public static void main(String[] args) throws Exception {
        //請求地址
        String url = "http://localhost:%s/mall/commodity/purchase";
        //請求參數,商品ID,數量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        //建立10個線程經過HttpClient進行發送請求,測試
        for (int i = 0; i < 10; i++) {
            //8080、8081交替請求,每一個服務器處理5個請求
            String port = "808" + (i % 2);
            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
            commodityThread.start();
        }
    }

首先看購買的狀況,確定都是購買成功的:

關鍵是庫存數量是否正確:

有10個請求購買成功,庫存應該是90纔對,這裏庫存是95。事實證實本地鎖是不能解決多臺服務器秒殺搶購出現超賣的問題

爲何會這樣呢,請看示意圖:

其實和多線程問題是差很少的緣由,多個服務器去查詢數據庫,獲取到相同的庫存,而後更新庫存,致使數據不正確。要保證庫存的數量正確,關鍵在於多臺服務器要保證只能一臺服務器在執行這段邏輯,也就是要加分佈式鎖。

這也體現出分佈式鎖的做用,就是要保證多臺服務器只能有一臺服務器執行。

分佈式鎖有三種實現方式,分別是redis、ZooKeeper、數據庫(好比mysql)。

3、使用ZooKeeper實現分佈式鎖

3.1 原理

其實是利用ZooKeeper的臨時順序節點的特性實現分佈式鎖。怎麼實現呢?

假設如今有一個客戶端A,須要加鎖,那麼就在"/Lock"路徑下建立一個臨時順序節點。而後獲取"/Lock"下的節點列表,判斷本身的序號是不是最小的,若是是最小的序號,則加鎖成功!

如今又有另外一個客戶端,客戶端B須要加鎖,那麼也是在"/Lock"路徑下建立臨時順序節點。依然獲取"/Lock"下的節點列表,判斷本身的節點序號是否最小的。發現不是最小的,加鎖失敗,接着對本身的上一個節點進行監聽。

怎麼釋放鎖呢,其實就是把臨時節點刪除。假設客戶端A釋放鎖,把節點01刪除了。那就會觸發節點02的監聽事件,客戶端就再次獲取節點列表,而後判斷本身是不是最小的序號,若是是最小序號則加鎖。

若是多個客戶端其實也是同樣,一上來就會建立一個臨時節點,而後開始判斷本身是不是最小的序號,若是不是就監聽上一個節點,造成一種排隊的機制。也就造成了鎖的效果,保證了多臺服務器只有一臺執行。

假設其中有一個客戶端宕機了,根據臨時節點的特色,ZooKeeper會自動刪除對應的臨時節點,至關於自動釋放了鎖。

3.2 手寫代碼實現分佈式鎖

首先加入Maven依賴

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>

接着按照上面分析的思路敲代碼,建立ZkLock類:

public class ZkLock implements Lock {
    //計數器,用於加鎖失敗時,阻塞
    private static CountDownLatch cdl = new CountDownLatch(1);
    //ZooKeeper服務器的IP端口
    private static final String IP_PORT = "127.0.0.1:2181";
    //鎖的根路徑
    private static final String ROOT_NODE = "/Lock";
    //上一個節點的路徑
    private volatile String beforePath;
    //當前上鎖的節點路徑
    private volatile String currPath;
    //建立ZooKeeper客戶端
    private ZkClient zkClient = new ZkClient(IP_PORT);

    public ZkLock() {
        //判斷是否存在根節點
        if (!zkClient.exists(ROOT_NODE)) {
            //不存在則建立
            zkClient.createPersistent(ROOT_NODE);
        }
    }
    
    //加鎖
    public void lock() {
        if (tryLock()) {
            System.out.println("加鎖成功!!");
        } else {
            // 嘗試加鎖失敗,進入等待 監聽
            waitForLock();
            // 再次嘗試加鎖
            lock();
        }

    }
    
    //嘗試加鎖
    public synchronized boolean tryLock() {
        // 第一次就進來建立本身的臨時節點
        if (StringUtils.isBlank(currPath)) {
            currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
        }
        // 對節點排序
        List<String> children = zkClient.getChildren(ROOT_NODE);
        Collections.sort(children);

        // 當前的是最小節點就返回加鎖成功
        if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
            return true;
        } else {
            // 不是最小節點 就找到本身的前一個 依次類推 釋放也是同樣
            int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
            beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
            //返回加鎖失敗
            return false;
        }
    }
    
    //解鎖
    public void unlock() {
        //刪除節點並關閉客戶端
        zkClient.delete(currPath);
        zkClient.close();
    }
    
    //等待上鎖,加鎖失敗進入阻塞,監聽上一個節點
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            //監聽節點更新事件
            public void handleDataChange(String s, Object o) throws Exception {
            }

            //監聽節點被刪除事件
            public void handleDataDeleted(String s) throws Exception {
                //解除阻塞
                cdl.countDown();
            }
        };
        // 監聽上一個節點
        this.zkClient.subscribeDataChanges(beforePath, listener);
        //判斷上一個節點是否存在
        if (zkClient.exists(beforePath)) {
            //上一個節點存在
            try {
                System.out.println("加鎖失敗 等待");
                //加鎖失敗,阻塞等待
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 釋放監聽
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly() throws InterruptedException {
    }

    public Condition newCondition() {
        return null;
    }
}

在Controller層加上鎖:

@PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool;
        //獲取ZooKeeper分佈式鎖
        ZkLock zkLock = new ZkLock();
        try {
            //上鎖
            zkLock.lock();
            //調用秒殺搶購的service方法
            bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
        } catch (Exception e) {
            e.printStackTrace();
            bool = false;
        } finally {
            //解鎖
            zkLock.unlock();
        }
        return bool;
    }

測試,依然起兩臺服務器,8080、8081。而後跑測試腳本:

public static void main(String[] args) throws Exception {
        //請求地址
        String url = "http://localhost:%s/mall/commodity/purchase";
        //請求參數,商品ID,數量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        //建立10個線程經過HttpClient進行發送請求,測試
        for (int i = 0; i < 10; i++) {
            //8080、8081交替請求
            String port = "808" + (i % 2);
            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
            commodityThread.start();
        }
    }

結果正確:

3.3 造好的輪子

Curator是Apache開源的一個操做ZooKeeper的框架。其中就有實現ZooKeeper分佈式鎖的功能。

固然分佈式鎖的實現只是這個框架的其中一個很小的部分,除此以外還有不少用途,你們能夠到官網去學習。

首先添加Maven依賴:

<dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.3.0</version>
    </dependency>

仍是同樣在須要加鎖的地方進行加鎖:

@PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
                                         @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool = false;
        //設置重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
        // 啓動客戶端
        client.start();
        InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
        try {
            //加鎖
            if (mutex.acquire(3, TimeUnit.SECONDS)) {
                //調用搶購秒殺service方法
                bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解鎖
            mutex.release();
            client.close();
        }
        return bool;
    }

4、遇到的坑

我嘗試用原生的ZooKeeper寫分佈式鎖,有點炸裂。遇到很多坑,最終放棄了,用zkclient的API。可能我太菜了不太會用。

下面我分享我遇到的一些問題,但願大家在遇到同類型的異常時能迅速定位問題。

4.1 Session expired

這個錯誤是使用原生ZooKeeper的API出現的錯誤。主要是我在進入debug模式進行調試出現的。

由於原生的ZooKeeper須要設定一個會話超時時間,通常debug模式咱們都會卡在一個地方去調試,確定就超出了設置的會話時間~

4.2 KeeperErrorCode = ConnectionLoss

這個也是原生ZooKeeper的API的錯誤,怎麼出現的呢?

主要是建立的ZooKeeper客戶端鏈接服務器時是異步的,因爲鏈接須要時間,還沒鏈接成功,代碼已經開始執行create()或者exists(),而後就報這個錯誤。

解決方法:使用CountDownLatch計數器阻塞,鏈接成功後再中止阻塞,而後執行create()或者exists()等操做。

4.3 併發查詢更新出現數據不一致

這個錯誤真的太炸裂了~

一開始我是把分佈式鎖加在service層,而後覺得搞定了。接着啓動8080、8081進行併發測試。10個線程都是購買成功,結果竟然是不正確!

第一反應以爲本身實現的代碼有問題,因而換成curator框架實現的分佈式鎖,開源框架應該沒問題了吧。沒想到仍是不行~

既然不是鎖自己的問題,是否是事務問題。上一個事務更新庫存的操做還沒提交,而後下一個請求就進來查詢。因而我就把加鎖的範圍放大一點,放在Controller層。竟然成功了!

你可能已經注意到,我在上面的例子就是把分佈式鎖加在Controller層,其實我不太喜歡在Controller層寫太多代碼。

也許有更加優雅的方式,惋惜本人能力不足,若是你有更好的實現方式,能夠分享一下~

補充:下面評論有位大佬說,在原來的方法外再包裹一層,親測是能夠的。這應該是事務的問題。

上面放在Controller層能夠成功是否是由於Controller層沒有事務,原來寫在service我是寫了一個@Transactional註解在類上,因此整個類裏面的都有事務,因此解鎖後還沒提交事務去更新數據庫,而後下一個請求進來就查到了沒更新的數據。

爲了優雅一點,就把@Transactional註解放在搶購的service方法上

而後再包裹一個沒有事務的方法,用於上鎖。

5、總結

最後,咱們回顧總結一下吧:

  • 首先咱們模擬單機多線程的秒殺場景,單機的話可使用本地鎖解決問題。
  • 接着模擬多服務器多線程的場景,思路是使用ZooKeeper實現分佈式鎖解決。
  • 圖解ZooKeeper實現分佈式鎖的原理。
  • 而後動手寫代碼,實現分佈式鎖。
  • 最後總結遇到的坑。

但願這篇文章對你有用,以爲有用就點個贊吧~

相關文章
相關標籤/搜索