文章已收錄到Github精選,歡迎Star:
https://github.com/yehongzhi/learningSummary
通過《ZooKeeper入門》後,咱們學會了ZooKeeper的基本用法。java
實際上ZooKeeper的應用是很是普遍的,實現分佈式鎖只是其中一種。接下來咱們就ZooKeeper實現分佈式鎖解決秒殺超賣問題進行展開。mysql
秒殺活動應該都不陌生,不用過多解釋。git
不難想象,在這種"秒殺"的場景中,實際上會出現多個用戶爭搶"資源"的狀況,也就是多個線程同時併發,這種狀況是很容易出現數據不許確,也就是超賣問題。github
下面使用程序演示,我使用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一個簡單的項目,github地址:redis
https://github.com/yehongzhi/...
建立一個商品信息表:sql
CREATE TABLE `tb_commodity_info` ( `id` varchar(32) NOT NULL, `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名稱', `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品價格', `number` int(10) DEFAULT '0' COMMENT '商品數量', `description` varchar(2048) DEFAULT '' COMMENT '商品描述', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
添加一個商品[叉燒包]進去:
數據庫
核心的代碼邏輯是這樣的:apache
@Override public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception { //1.先查詢數據庫中商品的數量 TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId); //2.判斷商品數量是否大於0,或者購買的數量大於庫存 Integer count = commodityInfo.getNumber(); if (count <= 0 || number > count) { //商品數量小於或者等於0,或者購買的數量大於庫存,則返回false return false; } //3.若是庫存數量大於0,而且購買的數量小於或者等於庫存。則更新商品數量 count -= number; commodityInfo.setNumber(count); boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1; if (bool) { //若是更新成功,則打印購買商品成功 System.out.println("購買商品[ " + commodityInfo.getCommodityName() + " ]成功,數量爲:" + number); } return bool; }
邏輯示意圖以下:服務器
上面這個邏輯,若是單線程請求的話是沒有問題的。多線程
可是多線程的話就出現問題了。如今我就建立多個線程,經過HttpClient進行請求,看會發生什麼:
public static void main(String[] args) throws Exception { //請求地址 String url = "http://localhost:8080/mall/commodity/purchase"; //請求參數,商品ID,數量 Map<String, String> map = new HashMap<>(); map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1"); map.put("number", "1"); //建立10個線程經過HttpClient進行發送請求,測試 for (int i = 0; i < 10; i++) { //這個線程的邏輯僅僅是發送請求 CommodityThread commodityThread = new CommodityThread(url, map); commodityThread.start(); } }
說明一下,叉燒包的數量是100,這裏有10個線程同時去購買,假設都購買成功的話,庫存數量應該是90。
實際上,10個線程的確都購買成功了:
可是數據庫的商品庫存,卻不許確:
上面的場景,大概流程以下所示:
能夠看出問題的關鍵在於兩個線程"同時"去查詢剩餘的庫存,而後更新庫存致使的。要解決這個問題,其實只要保證多個線程在這段邏輯是順序執行便可,也就是加鎖。
本地鎖JDK提供有兩種:synchronized和Lock鎖。
兩種方式均可以,我這裏爲了簡便,使用synchronized:
//使用synchronized修飾方法 @Override public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception { //省略... }
而後再測試剛剛多線程併發搶購的狀況,看看結果:
問題獲得解決!!!
你覺得事情就這樣結束了嗎,看了看進度條,發現事情並不簡單。
咱們知道在實際項目中,每每不會只部署一臺服務器,因此不妨咱們啓動兩臺服務器,端口號分別是8080、8081,模擬實際項目的場景:
寫一個交替請求的測試腳本,模擬多臺服務器分別處理請求,用戶秒殺搶購的場景:
public static void main(String[] args) throws Exception { //請求地址 String url = "http://localhost:%s/mall/commodity/purchase"; //請求參數,商品ID,數量 Map<String, String> map = new HashMap<>(); map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1"); map.put("number", "1"); //建立10個線程經過HttpClient進行發送請求,測試 for (int i = 0; i < 10; i++) { //8080、8081交替請求,每一個服務器處理5個請求 String port = "808" + (i % 2); CommodityThread commodityThread = new CommodityThread(String.format(url, port), map); commodityThread.start(); } }
首先看購買的狀況,確定都是購買成功的:
關鍵是庫存數量是否正確:
有10個請求購買成功,庫存應該是90纔對,這裏庫存是95。事實證實本地鎖是不能解決多臺服務器秒殺搶購出現超賣的問題。
爲何會這樣呢,請看示意圖:
其實和多線程問題是差很少的緣由,多個服務器去查詢數據庫,獲取到相同的庫存,而後更新庫存,致使數據不正確。要保證庫存的數量正確,關鍵在於多臺服務器要保證只能一臺服務器在執行這段邏輯,也就是要加分佈式鎖。
這也體現出分佈式鎖的做用,就是要保證多臺服務器只能有一臺服務器執行。
分佈式鎖有三種實現方式,分別是redis、ZooKeeper、數據庫(好比mysql)。
其實是利用ZooKeeper的臨時順序節點的特性實現分佈式鎖。怎麼實現呢?
假設如今有一個客戶端A,須要加鎖,那麼就在"/Lock"路徑下建立一個臨時順序節點。而後獲取"/Lock"下的節點列表,判斷本身的序號是不是最小的,若是是最小的序號,則加鎖成功!
如今又有另外一個客戶端,客戶端B須要加鎖,那麼也是在"/Lock"路徑下建立臨時順序節點。依然獲取"/Lock"下的節點列表,判斷本身的節點序號是否最小的。發現不是最小的,加鎖失敗,接着對本身的上一個節點進行監聽。
怎麼釋放鎖呢,其實就是把臨時節點刪除。假設客戶端A釋放鎖,把節點01刪除了。那就會觸發節點02的監聽事件,客戶端就再次獲取節點列表,而後判斷本身是不是最小的序號,若是是最小序號則加鎖。
若是多個客戶端其實也是同樣,一上來就會建立一個臨時節點,而後開始判斷本身是不是最小的序號,若是不是就監聽上一個節點,造成一種排隊的機制。也就造成了鎖的效果,保證了多臺服務器只有一臺執行。
假設其中有一個客戶端宕機了,根據臨時節點的特色,ZooKeeper會自動刪除對應的臨時節點,至關於自動釋放了鎖。
首先加入Maven依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.4</version> </dependency>
接着按照上面分析的思路敲代碼,建立ZkLock類:
public class ZkLock implements Lock { //計數器,用於加鎖失敗時,阻塞 private static CountDownLatch cdl = new CountDownLatch(1); //ZooKeeper服務器的IP端口 private static final String IP_PORT = "127.0.0.1:2181"; //鎖的根路徑 private static final String ROOT_NODE = "/Lock"; //上一個節點的路徑 private volatile String beforePath; //當前上鎖的節點路徑 private volatile String currPath; //建立ZooKeeper客戶端 private ZkClient zkClient = new ZkClient(IP_PORT); public ZkLock() { //判斷是否存在根節點 if (!zkClient.exists(ROOT_NODE)) { //不存在則建立 zkClient.createPersistent(ROOT_NODE); } } //加鎖 public void lock() { if (tryLock()) { System.out.println("加鎖成功!!"); } else { // 嘗試加鎖失敗,進入等待 監聽 waitForLock(); // 再次嘗試加鎖 lock(); } } //嘗試加鎖 public synchronized boolean tryLock() { // 第一次就進來建立本身的臨時節點 if (StringUtils.isBlank(currPath)) { currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock"); } // 對節點排序 List<String> children = zkClient.getChildren(ROOT_NODE); Collections.sort(children); // 當前的是最小節點就返回加鎖成功 if (currPath.equals(ROOT_NODE + "/" + children.get(0))) { return true; } else { // 不是最小節點 就找到本身的前一個 依次類推 釋放也是同樣 int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1; beforePath = ROOT_NODE + "/" + children.get(beforePathIndex); //返回加鎖失敗 return false; } } //解鎖 public void unlock() { //刪除節點並關閉客戶端 zkClient.delete(currPath); zkClient.close(); } //等待上鎖,加鎖失敗進入阻塞,監聽上一個節點 private void waitForLock() { IZkDataListener listener = new IZkDataListener() { //監聽節點更新事件 public void handleDataChange(String s, Object o) throws Exception { } //監聽節點被刪除事件 public void handleDataDeleted(String s) throws Exception { //解除阻塞 cdl.countDown(); } }; // 監聽上一個節點 this.zkClient.subscribeDataChanges(beforePath, listener); //判斷上一個節點是否存在 if (zkClient.exists(beforePath)) { //上一個節點存在 try { System.out.println("加鎖失敗 等待"); //加鎖失敗,阻塞等待 cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 釋放監聽 zkClient.unsubscribeDataChanges(beforePath, listener); } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } public void lockInterruptibly() throws InterruptedException { } public Condition newCondition() { return null; } }
在Controller層加上鎖:
@PostMapping("/purchase") public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception { boolean bool; //獲取ZooKeeper分佈式鎖 ZkLock zkLock = new ZkLock(); try { //上鎖 zkLock.lock(); //調用秒殺搶購的service方法 bool = commodityInfoService.purchaseCommodityInfo(commodityId, number); } catch (Exception e) { e.printStackTrace(); bool = false; } finally { //解鎖 zkLock.unlock(); } return bool; }
測試,依然起兩臺服務器,8080、8081。而後跑測試腳本:
public static void main(String[] args) throws Exception { //請求地址 String url = "http://localhost:%s/mall/commodity/purchase"; //請求參數,商品ID,數量 Map<String, String> map = new HashMap<>(); map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1"); map.put("number", "1"); //建立10個線程經過HttpClient進行發送請求,測試 for (int i = 0; i < 10; i++) { //8080、8081交替請求 String port = "808" + (i % 2); CommodityThread commodityThread = new CommodityThread(String.format(url, port), map); commodityThread.start(); } }
結果正確:
Curator是Apache開源的一個操做ZooKeeper的框架。其中就有實現ZooKeeper分佈式鎖的功能。
固然分佈式鎖的實現只是這個框架的其中一個很小的部分,除此以外還有不少用途,你們能夠到官網去學習。
首先添加Maven依賴:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency>
仍是同樣在須要加鎖的地方進行加鎖:
@PostMapping("/purchase") public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception { boolean bool = false; //設置重試策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); // 啓動客戶端 client.start(); InterProcessMutex mutex = new InterProcessMutex(client, "/locks"); try { //加鎖 if (mutex.acquire(3, TimeUnit.SECONDS)) { //調用搶購秒殺service方法 bool = commodityInfoService.purchaseCommodityInfo(commodityId, number); } } catch (Exception e) { e.printStackTrace(); } finally { //解鎖 mutex.release(); client.close(); } return bool; }
我嘗試用原生的ZooKeeper寫分佈式鎖,有點炸裂。遇到很多坑,最終放棄了,用zkclient的API。可能我太菜了不太會用。
下面我分享我遇到的一些問題,但願大家在遇到同類型的異常時能迅速定位問題。
這個錯誤是使用原生ZooKeeper的API出現的錯誤。主要是我在進入debug模式進行調試出現的。
由於原生的ZooKeeper須要設定一個會話超時時間,通常debug模式咱們都會卡在一個地方去調試,確定就超出了設置的會話時間~
這個也是原生ZooKeeper的API的錯誤,怎麼出現的呢?
主要是建立的ZooKeeper客戶端鏈接服務器時是異步的,因爲鏈接須要時間,還沒鏈接成功,代碼已經開始執行create()或者exists(),而後就報這個錯誤。
解決方法:使用CountDownLatch計數器阻塞,鏈接成功後再中止阻塞,而後執行create()或者exists()等操做。
這個錯誤真的太炸裂了~
一開始我是把分佈式鎖加在service層,而後覺得搞定了。接着啓動8080、8081進行併發測試。10個線程都是購買成功,結果竟然是不正確!
第一反應以爲本身實現的代碼有問題,因而換成curator框架實現的分佈式鎖,開源框架應該沒問題了吧。沒想到仍是不行~
既然不是鎖自己的問題,是否是事務問題。上一個事務更新庫存的操做還沒提交,而後下一個請求就進來查詢。因而我就把加鎖的範圍放大一點,放在Controller層。竟然成功了!
你可能已經注意到,我在上面的例子就是把分佈式鎖加在Controller層,其實我不太喜歡在Controller層寫太多代碼。
也許有更加優雅的方式,惋惜本人能力不足,若是你有更好的實現方式,能夠分享一下~
補充:下面評論有位大佬說,在原來的方法外再包裹一層,親測是能夠的。這應該是事務的問題。
上面放在Controller層能夠成功是否是由於Controller層沒有事務,原來寫在service我是寫了一個@Transactional註解在類上,因此整個類裏面的都有事務,因此解鎖後還沒提交事務去更新數據庫,而後下一個請求進來就查到了沒更新的數據。
爲了優雅一點,就把@Transactional註解放在搶購的service方法上
而後再包裹一個沒有事務的方法,用於上鎖。
最後,咱們回顧總結一下吧:
但願這篇文章對你有用,以爲有用就點個贊吧~