前言java
對於分佈式鎖的問題我也查過不少資料,感受不少方式實現的並不完善,或者看着雲裏霧裏的,不知因此然,因而就整理了這篇文章,但願對您有用,有寫的不對的地方,歡迎留言指正。redis
首先我們來聊聊什麼是分佈式鎖,到底解決了什麼問題?直接看代碼數據庫
$stock = $this->getStockFromDb();//查詢剩餘庫存 if ($stock>0){ $this->ReduceStockInDb(); // 在數據庫中進行減庫存操做 echo "successful"; }else{ echo "庫存不足"; }
很簡單的一個場景,用戶下單,我們查詢商品庫存夠不夠,不夠的話直接返回庫存不足相似的錯誤信息,若是庫存夠的話直接在數據庫中庫存-1,而後返回成功,在業務邏輯上這段代碼是沒有什麼問題的。session
可是,這段代碼是存在嚴重的問題的。
併發
若是庫存只剩 1,而且在併發比較高的狀況下,好比兩個請求同時執行了這段代碼,同時查到庫存爲 1,而後順利成章的都去數據庫執行 stock-1 的操做,這樣庫存就會變成-1,而後就會引起超賣的現象,剛纔說的是兩個請求同時執行,若是同時幾千個請求打過來,可見形成的損失是很是大的。因而呢有些聰明人就想了個辦法,辦法以下。分佈式
你們都知道 redis 有個 setnx 命令,不知道的話也不要緊,我已經幫你查過了
高併發
咱們把上面的代碼優化一下優化
version-1this
$lock_key="lock_key"; $res = $redis->setNx($lock_key, 1); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查詢剩餘庫存 if ($stock>0){ $this->ReduceStockInDb(); // 在數據庫中進行減庫存操做 echo "successful"; }else{ echo "庫存不足"; } $redis->delete($lock_key);
看似問題解決了,其實並否則。lua
咱們這裏僞代碼寫的簡單,查詢一下庫存,而後減1操做而已,可是真實的生產環境中的狀況是很是複雜的,在一些極端狀況下,程序極可能會報錯,崩潰,若是第一次執行加鎖了以後程序報錯了,那這個鎖永遠存在,接下來的請求永遠也請求不進來了,因此我們繼續優化
version-2
try{ //新加入try catch處理,這樣程序萬一報錯會把鎖刪除掉 $lock_key="lock_key"; $expire_time = 5;//新加入過時時間,這樣鎖不會一直佔有 $res = $redis->setNx($lock_key, 1, $expire_time); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查詢剩餘庫存 if ($stock>0){ $this->ReduceStockInDb(); // 在數據庫中進行減庫存操做 echo "successful"; }else{ echo "庫存不足"; } }finally { $redis->delete($lock_key); }
此次是把死鎖問題解決了,可是問題仍是存在,你們能夠先想想還存在什麼問題再接着往下看。
存在的問題以下
我們接着優化
version-3
try{ //新加入try catch處理,這樣程序萬一報錯會把鎖刪除掉 $lock_key="lock_key"; $expire_time = 5;//新加入過時時間,這樣鎖不會一直佔有 $client_id = session_create_id(); //對每一個請求生成惟一性的id $res = $redis->setNx($lock_key, $client_id, $expire_time); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查詢剩餘庫存 if ($stock>0){ $this->ReduceStockInDb(); // 在數據庫中進行減庫存操做 echo "successful"; }else{ echo "庫存不足"; } }finally { if ($redis->get($lock_key) == $client_id){ //在這裏加一個判斷,保證每次刪除的鎖是當次請求加的鎖,這樣避免誤刪了別的請求加的鎖 $redis->delete($lock_key); } }
可是上面方案還有問題,咱們看最後 redis是先進行了get操做判斷,而後再刪除,是兩步操做,並無保證其原子性,redis的多步操做能夠用lua腳原本保證原子性,其實看到lua也不須要感受太陌生,他就是一種語言而已,在這裏的做用是把多個redis操做打包成一個命令去執行,保證了原子性而已
version-4
try{ //新加入try catch處理,這樣程序萬一報錯會把鎖刪除掉 $lock_key="lock_key"; $expire_time = 5;//新加入過時時間,這樣鎖不會一直佔有 $client_id = session_create_id(); //對每一個請求生成惟一性的id $res = $redis->setNx($lock_key, $client_id, $expire_time); if (!$res){ return "error_code"; } $stock = $this->getStockFromDb();//查詢剩餘庫存 if ($stock>0){ $this->ReduceStockInDb(); // 在數據庫中進行減庫存操做 echo "successful"; }else{ echo "庫存不足"; } }finally { $script = ' //此處用lua腳本執行是爲了get對比以後再delete的兩步操做的原子性 if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end '; return $instance->eval($script, [$lock_key, $client_id], 1); }
這樣封裝以後,分佈式鎖應該就比較完善了。固然咱們還能夠進一步的優化一下用戶體驗
version-5
$retry_times = 3; //重試次數 $usleep_times = 5000;//重試間隔時間 try{ //新加入try catch處理,這樣程序萬一報錯會把鎖刪除掉 $lock_key="lock_key"; $expire_time = 5;//新加入過時時間,這樣鎖不會一直佔有 while($retry_times > 0){ $client_id = session_create_id(); //對每一個請求生成惟一性的id $res = $redis->setNx($lock_key, $client_id, $expire_time); if ($res){ break; } echo "嘗試從新獲取鎖"; $retry_times--; usleep($usleep_times); } if (!$res){ //重試三次以後都沒有獲取到鎖則給用戶返回錯誤信息 return "error_code"; } $stock = $this->getStockFromDb();//查詢剩餘庫存 if ($stock>0){ $this->ReduceStockInDb(); // 在數據庫中進行減庫存操做 echo "successful"; }else{ echo "庫存不足"; } }finally { $script = ' //此處用lua腳本執行是爲了get對比以後再delete的兩步操做的原子性 if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end '; return $instance->eval($script, [$lock_key, $client_id], 1); }
固然上面的分佈式鎖仍是不夠完善的,好比redis主從同步延遲,就會產生問題,像java中redission實現的思想是很是好的,你們感興趣能夠看看源碼,今天就聊到這裏,感興趣的朋友能夠留言你們一塊兒討論