通過《ZooKeeper入門》後,咱們學會了ZooKeeper的基本用法。java
實際上ZooKeeper的應用是很是普遍的,實現分佈式鎖只是其中一種。接下來咱們就ZooKeeper實現分佈式鎖解決秒殺超賣問題進行展開。mysql
秒殺活動應該都不陌生,不用過多解釋。git
不難想象,在這種"秒殺"的場景中,實際上會出現多個用戶爭搶"資源"的狀況,也就是多個線程同時併發,這種狀況是很容易出現數據不許確,也就是超賣問題。程序員
下面使用程序演示,我使用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一個簡單的項目,github地址:github
建立一個商品信息表:sql
CREATE TABLE `tb_commodity_info` (
`id` varchar(32) NOT NULL,
`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名稱',
`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品價格',
`number` int(10) DEFAULT '0' COMMENT '商品數量',
`description` varchar(2048) DEFAULT '' COMMENT '商品描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
複製代碼
添加一個商品[叉燒包]進去: 數據庫
核心的代碼邏輯是這樣的:apache
@Override
public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//1.先查詢數據庫中商品的數量
TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
//2.判斷商品數量是否大於0,或者購買的數量大於庫存
Integer count = commodityInfo.getNumber();
if (count <= 0 || number > count) {
//商品數量小於或者等於0,或者購買的數量大於庫存,則返回false
return false;
}
//3.若是庫存數量大於0,而且購買的數量小於或者等於庫存。則更新商品數量
count -= number;
commodityInfo.setNumber(count);
boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
if (bool) {
//若是更新成功,則打印購買商品成功
System.out.println("購買商品[ " + commodityInfo.getCommodityName() + " ]成功,數量爲:" + number);
}
return bool;
}
複製代碼
邏輯示意圖以下:服務器
上面這個邏輯,若是單線程請求的話是沒有問題的。
可是多線程的話就出現問題了。如今我就建立多個線程,經過HttpClient進行請求,看會發生什麼:
public static void main(String[] args) throws Exception {
//請求地址
String url = "http://localhost:8080/mall/commodity/purchase";
//請求參數,商品ID,數量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//建立10個線程經過HttpClient進行發送請求,測試
for (int i = 0; i < 10; i++) {
//這個線程的邏輯僅僅是發送請求
CommodityThread commodityThread = new CommodityThread(url, map);
commodityThread.start();
}
}
複製代碼
說明一下,叉燒包的數量是100,這裏有10個線程同時去購買,假設都購買成功的話,庫存數量應該是90。
實際上,10個線程的確都購買成功了:
可是數據庫的商品庫存,卻不許確:
上面的場景,大概流程以下所示:
能夠看出問題的關鍵在於兩個線程"同時"去查詢剩餘的庫存,而後更新庫存致使的。要解決這個問題,其實只要保證多個線程在這段邏輯是順序執行便可,也就是加鎖。
本地鎖JDK提供有兩種:synchronized和Lock鎖。
兩種方式均可以,我這裏爲了簡便,使用synchronized:
//使用synchronized修飾方法
@Override
public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//省略...
}
複製代碼
而後再測試剛剛多線程併發搶購的狀況,看看結果:
問題獲得解決!!!
你覺得事情就這樣結束了嗎,看了看進度條,發現事情並不簡單。
咱們知道在實際項目中,每每不會只部署一臺服務器,因此不妨咱們啓動兩臺服務器,端口號分別是8080、8081,模擬實際項目的場景:
寫一個交替請求的測試腳本,模擬多臺服務器分別處理請求,用戶秒殺搶購的場景:
public static void main(String[] args) throws Exception {
//請求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//請求參數,商品ID,數量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//建立10個線程經過HttpClient進行發送請求,測試
for (int i = 0; i < 10; i++) {
//8080、8081交替請求,每一個服務器處理5個請求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}
複製代碼
首先看購買的狀況,確定都是購買成功的:
關鍵是庫存數量是否正確:
有10個請求購買成功,庫存應該是90纔對,這裏庫存是95。事實證實本地鎖是不能解決多臺服務器秒殺搶購出現超賣的問題。
爲何會這樣呢,請看示意圖:
其實和多線程問題是差很少的緣由,多個服務器去查詢數據庫,獲取到相同的庫存,而後更新庫存,致使數據不正確。要保證庫存的數量正確,關鍵在於多臺服務器要保證只能一臺服務器在執行這段邏輯,也就是要加分佈式鎖。
這也體現出分佈式鎖的做用,就是要保證多臺服務器只能有一臺服務器執行。
分佈式鎖有三種實現方式,分別是redis、ZooKeeper、數據庫(好比mysql)。
其實是利用ZooKeeper的臨時順序節點的特性實現分佈式鎖。怎麼實現呢?
假設如今有一個客戶端A,須要加鎖,那麼就在"/Lock"路徑下建立一個臨時順序節點。而後獲取"/Lock"下的節點列表,判斷本身的序號是不是最小的,若是是最小的序號,則加鎖成功!
如今又有另外一個客戶端,客戶端B須要加鎖,那麼也是在"/Lock"路徑下建立臨時順序節點。依然獲取"/Lock"下的節點列表,判斷本身的節點序號是否最小的。發現不是最小的,加鎖失敗,接着對本身的上一個節點進行監聽。
怎麼釋放鎖呢,其實就是把臨時節點刪除。假設客戶端A釋放鎖,把節點01刪除了。那就會觸發節點02的監聽事件,客戶端就再次獲取節點列表,而後判斷本身是不是最小的序號,若是是最小序號則加鎖。
若是多個客戶端其實也是同樣,一上來就會建立一個臨時節點,而後開始判斷本身是不是最小的序號,若是不是就監聽上一個節點,造成一種排隊的機制。也就造成了鎖的效果,保證了多臺服務器只有一臺執行。
假設其中有一個客戶端宕機了,根據臨時節點的特色,ZooKeeper會自動刪除對應的臨時節點,至關於自動釋放了鎖。
首先加入Maven依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
複製代碼
接着按照上面分析的思路敲代碼,建立ZkLock類:
public class ZkLock implements Lock {
//計數器,用於加鎖失敗時,阻塞
private static CountDownLatch cdl = new CountDownLatch(1);
//ZooKeeper服務器的IP端口
private static final String IP_PORT = "127.0.0.1:2181";
//鎖的根路徑
private static final String ROOT_NODE = "/Lock";
//上一個節點的路徑
private volatile String beforePath;
//當前上鎖的節點路徑
private volatile String currPath;
//建立ZooKeeper客戶端
private ZkClient zkClient = new ZkClient(IP_PORT);
public ZkLock() {
//判斷是否存在根節點
if (!zkClient.exists(ROOT_NODE)) {
//不存在則建立
zkClient.createPersistent(ROOT_NODE);
}
}
//加鎖
public void lock() {
if (tryLock()) {
System.out.println("加鎖成功!!");
} else {
// 嘗試加鎖失敗,進入等待 監聽
waitForLock();
// 再次嘗試加鎖
lock();
}
}
//嘗試加鎖
public synchronized boolean tryLock() {
// 第一次就進來建立本身的臨時節點
if (StringUtils.isBlank(currPath)) {
currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
}
// 對節點排序
List<String> children = zkClient.getChildren(ROOT_NODE);
Collections.sort(children);
// 當前的是最小節點就返回加鎖成功
if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
return true;
} else {
// 不是最小節點 就找到本身的前一個 依次類推 釋放也是同樣
int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
//返回加鎖失敗
return false;
}
}
//解鎖
public void unlock() {
//刪除節點並關閉客戶端
zkClient.delete(currPath);
zkClient.close();
}
//等待上鎖,加鎖失敗進入阻塞,監聽上一個節點
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
//監聽節點更新事件
public void handleDataChange(String s, Object o) throws Exception {
}
//監聽節點被刪除事件
public void handleDataDeleted(String s) throws Exception {
//解除阻塞
cdl.countDown();
}
};
// 監聽上一個節點
this.zkClient.subscribeDataChanges(beforePath, listener);
//判斷上一個節點是否存在
if (zkClient.exists(beforePath)) {
//上一個節點存在
try {
System.out.println("加鎖失敗 等待");
//加鎖失敗,阻塞等待
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 釋放監聽
zkClient.unsubscribeDataChanges(beforePath, listener);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public void lockInterruptibly() throws InterruptedException {
}
public Condition newCondition() {
return null;
}
}
複製代碼
在Controller層加上鎖:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool;
//獲取ZooKeeper分佈式鎖
ZkLock zkLock = new ZkLock();
try {
//上鎖
zkLock.lock();
//調用秒殺搶購的service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
} catch (Exception e) {
e.printStackTrace();
bool = false;
} finally {
//解鎖
zkLock.unlock();
}
return bool;
}
複製代碼
測試,依然起兩臺服務器,8080、8081。而後跑測試腳本:
public static void main(String[] args) throws Exception {
//請求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//請求參數,商品ID,數量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//建立10個線程經過HttpClient進行發送請求,測試
for (int i = 0; i < 10; i++) {
//8080、8081交替請求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}
複製代碼
結果正確:
Curator是Apache開源的一個操做ZooKeeper的框架。其中就有實現ZooKeeper分佈式鎖的功能。
固然分佈式鎖的實現只是這個框架的其中一個很小的部分,除此以外還有不少用途,你們能夠到官網去學習。
首先添加Maven依賴:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
複製代碼
仍是同樣在須要加鎖的地方進行加鎖:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool = false;
//設置重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 啓動客戶端
client.start();
InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
try {
//加鎖
if (mutex.acquire(3, TimeUnit.SECONDS)) {
//調用搶購秒殺service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//解鎖
mutex.release();
client.close();
}
return bool;
}
複製代碼
我嘗試用原生的ZooKeeper寫分佈式鎖,有點炸裂。遇到很多坑,最終放棄了,用zkclient的API。可能我太菜了不太會用。
下面我分享我遇到的一些問題,但願大家在遇到同類型的異常時能迅速定位問題。
這個錯誤是使用原生ZooKeeper的API出現的錯誤。主要是我在進入debug模式進行調試出現的。
由於原生的ZooKeeper須要設定一個會話超時時間,通常debug模式咱們都會卡在一個地方去調試,確定就超出了設置的會話時間~
這個也是原生ZooKeeper的API的錯誤,怎麼出現的呢?
主要是建立的ZooKeeper客戶端鏈接服務器時是異步的,因爲鏈接須要時間,還沒鏈接成功,代碼已經開始執行create()或者exists(),而後就報這個錯誤。
解決方法:使用CountDownLatch計數器阻塞,鏈接成功後再中止阻塞,而後執行create()或者exists()等操做。
這個錯誤真的太炸裂了~
一開始我是把分佈式鎖加在service層,而後覺得搞定了。接着啓動8080、8081進行併發測試。10個線程都是購買成功,結果竟然是不正確!
第一反應以爲本身實現的代碼有問題,因而換成curator框架實現的分佈式鎖,開源框架應該沒問題了吧。沒想到仍是不行~
既然不是鎖自己的問題,是否是事務問題。上一個事務更新庫存的操做還沒提交,而後下一個請求就進來查詢。因而我就把加鎖的範圍放大一點,放在Controller層。竟然成功了!
你可能已經注意到,我在上面的例子就是把分佈式鎖加在Controller層,其實我不太喜歡在Controller層寫太多代碼。
也許有更加優雅的方式,惋惜本人能力不足,若是你有更好的實現方式,能夠分享一下~
最後,咱們回顧總結一下吧:
但願這篇文章對你有用
想第一時間看到我更新的文章,能夠微信搜索公衆號「java技術愛好者
」,拒絕作一條鹹魚,我是一個努力讓你們記住的程序員。咱們下期再見!!!
能力有限,若是有什麼錯誤或者不當之處,請你們批評指正,一塊兒學習交流!