這一次總結和分享用Redis實現分佈式鎖 與 實現任務隊列 這兩大強大的功能。先扯點我的觀點,以前我看了一篇博文說博客園的文章大部分都是分享代碼,博文裏強調說分享思路比分享代碼更重要(貌似大概是這個意思,如有誤請諒解),但我以爲,分享思路當然重要,但有了思路,卻沒有實現的代碼,那會讓人以爲很浮誇的,在工做中的程序猿都知道,你去實現一個功能模塊,一段代碼,雖然你有了思路,可是實現的過程也是很耗時的,特別是代碼調試,還有各類測試等等。因此我認爲,思路+代碼,纔是一篇好博文的主要核心。java
雙十一剛過不久,你們都知道在天貓、京東、蘇寧等等電商網站上有不少秒殺活動,例如在某一個時刻搶購一個原價1999如今秒殺價只要999的手機時,會迎來一個用戶請求的高峯期,可能會有幾十萬幾百萬的併發量,來搶這個手機,在高併發的情形下會對數據庫服務器或者是文件服務器應用服務器形成巨大的壓力,嚴重時說不定就宕機了,另外一個問題是,秒殺的東西都是有量的,例如一款手機只有10臺的量秒殺,那麼,在高併發的狀況下,成千上萬條數據更新數據庫(例如10臺的量被人搶一臺就會在數據集某些記錄下 減1),那次這個時候的前後順序是很亂的,很容易出現10臺的量,搶到的人就不止10個這種嚴重的問題。那麼,之後所說的問題咱們該如何去解決呢? 接下來我所分享的技術就能夠拿來處理以上的問題: 分佈式鎖 和 任務隊列。數據庫
以上就是實現 分佈式鎖 和 任務隊列 的簡單思路,若是你看完有點模棱兩可,那請看接下來的代碼實現。分佈式
(2)系統級的鎖當進程不管何種緣由時出現crash時,操做系統會本身回收鎖,因此不會出現資源丟失,但分佈式鎖不用,若一次性設置很長時間,一旦因爲各類緣由出現進程crash 或者其餘異常致使unlock未被調用時,則該鎖在剩下的時間就會變成垃圾鎖,致使其餘進程或者進程重啓後沒法進入加鎖區域。
這裏先取得當前時間,而後再獲取到鎖失敗時的等待超時的時刻(是個時間戳),再獲取到鎖的最大生存時刻是多少。這裏redis的key用這種格式:"Lock:鎖的標識名",這裏就開始進入循環了,先是插入數據到redis裏,使用setnx()函數,這函數的意思是,若是該鍵不存在則插入數據,將最大生存時刻做爲值存儲,假如插入成功,則對該鍵進行失效時間的設置,並將該鍵放在$lockedName數組裏,返回true,也就是上鎖成功;若是該鍵存在,則不會插入操做了,這裏有一步嚴謹的操做,那就是取得當前鍵的剩餘時間,假如這個時間小於0,表示key上沒有設置生存時間(key是不會不存在的,由於前面setnx會自動建立)若是出現這種情況,那就是進程的某個實例setnx成功後 crash 致使緊跟着的expire沒有被調用,這時能夠直接設置expire並把鎖納爲己用。若是沒設置鎖失敗的等待時間 或者 已超過最大等待時間了,那就退出循環,反之則 隔 $waitIntervalUs 後繼續 請求。 這就是加鎖的整一個代碼分析。
1 /** 2 * 加鎖 3 * @param [type] $name 鎖的標識名 4 * @param integer $timeout 循環獲取鎖的等待超時時間,在此時間內會一直嘗試獲取鎖直到超時,爲0表示失敗後直接返回不等待 5 * @param integer $expire 當前鎖的最大生存時間(秒),必須大於0,若是超過生存時間鎖仍未被釋放,則系統會自動強制釋放 6 * @param integer $waitIntervalUs 獲取鎖失敗後掛起再試的時間間隔(微秒) 7 * @return [type] [description] 8 */ 9 public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { 10 if ($name == null) return false; 11 12 //取得當前時間 13 $now = time(); 14 //獲取鎖失敗時的等待超時時刻 15 $timeoutAt = $now + $timeout; 16 //鎖的最大生存時刻 17 $expireAt = $now + $expire; 18 19 $redisKey = "Lock:{$name}"; 20 while (true) { 21 //將rediskey的最大生存時刻存到redis裏,過了這個時刻該鎖會被自動釋放 22 $result = $this->redisString->setnx($redisKey, $expireAt); 23 24 if ($result != false) { 25 //設置key的失效時間 26 $this->redisString->expire($redisKey, $expireAt); 27 //將鎖標誌放到lockedNames數組裏 28 $this->lockedNames[$name] = $expireAt; 29 return true; 30 } 31 32 //以秒爲單位,返回給定key的剩餘生存時間 33 $ttl = $this->redisString->ttl($redisKey); 34 35 //ttl小於0 表示key上沒有設置生存時間(key是不會不存在的,由於前面setnx會自動建立) 36 //若是出現這種情況,那就是進程的某個實例setnx成功後 crash 致使緊跟着的expire沒有被調用 37 //這時能夠直接設置expire並把鎖納爲己用 38 if ($ttl < 0) { 39 $this->redisString->set($redisKey, $expireAt); 40 $this->lockedNames[$name] = $expireAt; 41 return true; 42 } 43 44 /*****循環請求鎖部分*****/ 45 //若是沒設置鎖失敗的等待時間 或者 已超過最大等待時間了,那就退出 46 if ($timeout <= 0 || $timeoutAt < microtime(true)) break; 47 48 //隔 $waitIntervalUs 後繼續 請求 49 usleep($waitIntervalUs); 50 51 } 52 53 return false; 54 }
1 /** 2 * 解鎖 3 * @param [type] $name [description] 4 * @return [type] [description] 5 */ 6 public function unlock($name) { 7 //先判斷是否存在此鎖 8 if ($this->isLocking($name)) { 9 //刪除鎖 10 if ($this->redisString->deleteKey("Lock:$name")) { 11 //清掉lockedNames裏的鎖標誌 12 unset($this->lockedNames[$name]); 13 return true; 14 } 15 } 16 return false; 17 }
1 /** 2 * 釋放當前全部得到的鎖 3 * @return [type] [description] 4 */ 5 public function unlockAll() { 6 //此標誌是用來標誌是否釋放全部鎖成功 7 $allSuccess = true; 8 foreach ($this->lockedNames as $name => $expireAt) { 9 if (false === $this->unlock($name)) { 10 $allSuccess = false; 11 } 12 } 13 return $allSuccess; 14 }
1 /** 2 *在redis上實現分佈式鎖 3 */ 4 class RedisLock { 5 private $redisString; 6 private $lockedNames = []; 7 8 public function __construct($param = NULL) { 9 $this->redisString = RedisFactory::get($param)->string; 10 } 11 12 /** 13 * 加鎖 14 * @param [type] $name 鎖的標識名 15 * @param integer $timeout 循環獲取鎖的等待超時時間,在此時間內會一直嘗試獲取鎖直到超時,爲0表示失敗後直接返回不等待 16 * @param integer $expire 當前鎖的最大生存時間(秒),必須大於0,若是超過生存時間鎖仍未被釋放,則系統會自動強制釋放 17 * @param integer $waitIntervalUs 獲取鎖失敗後掛起再試的時間間隔(微秒) 18 * @return [type] [description] 19 */ 20 public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) { 21 if ($name == null) return false; 22 23 //取得當前時間 24 $now = time(); 25 //獲取鎖失敗時的等待超時時刻 26 $timeoutAt = $now + $timeout; 27 //鎖的最大生存時刻 28 $expireAt = $now + $expire; 29 30 $redisKey = "Lock:{$name}"; 31 while (true) { 32 //將rediskey的最大生存時刻存到redis裏,過了這個時刻該鎖會被自動釋放 33 $result = $this->redisString->setnx($redisKey, $expireAt); 34 35 if ($result != false) { 36 //設置key的失效時間 37 $this->redisString->expire($redisKey, $expireAt); 38 //將鎖標誌放到lockedNames數組裏 39 $this->lockedNames[$name] = $expireAt; 40 return true; 41 } 42 43 //以秒爲單位,返回給定key的剩餘生存時間 44 $ttl = $this->redisString->ttl($redisKey); 45 46 //ttl小於0 表示key上沒有設置生存時間(key是不會不存在的,由於前面setnx會自動建立) 47 //若是出現這種情況,那就是進程的某個實例setnx成功後 crash 致使緊跟着的expire沒有被調用 48 //這時能夠直接設置expire並把鎖納爲己用 49 if ($ttl < 0) { 50 $this->redisString->set($redisKey, $expireAt); 51 $this->lockedNames[$name] = $expireAt; 52 return true; 53 } 54 55 /*****循環請求鎖部分*****/ 56 //若是沒設置鎖失敗的等待時間 或者 已超過最大等待時間了,那就退出 57 if ($timeout <= 0 || $timeoutAt < microtime(true)) break; 58 59 //隔 $waitIntervalUs 後繼續 請求 60 usleep($waitIntervalUs); 61 62 } 63 64 return false; 65 } 66 67 /** 68 * 解鎖 69 * @param [type] $name [description] 70 * @return [type] [description] 71 */ 72 public function unlock($name) { 73 //先判斷是否存在此鎖 74 if ($this->isLocking($name)) { 75 //刪除鎖 76 if ($this->redisString->deleteKey("Lock:$name")) { 77 //清掉lockedNames裏的鎖標誌 78 unset($this->lockedNames[$name]); 79 return true; 80 } 81 } 82 return false; 83 } 84 85 /** 86 * 釋放當前全部得到的鎖 87 * @return [type] [description] 88 */ 89 public function unlockAll() { 90 //此標誌是用來標誌是否釋放全部鎖成功 91 $allSuccess = true; 92 foreach ($this->lockedNames as $name => $expireAt) { 93 if (false === $this->unlock($name)) { 94 $allSuccess = false; 95 } 96 } 97 return $allSuccess; 98 } 99 100 /** 101 * 給當前所增長指定生存時間,必須大於0 102 * @param [type] $name [description] 103 * @return [type] [description] 104 */ 105 public function expire($name, $expire) { 106 //先判斷是否存在該鎖 107 if ($this->isLocking($name)) { 108 //所指定的生存時間必須大於0 109 $expire = max($expire, 1); 110 //增長鎖生存時間 111 if ($this->redisString->expire("Lock:$name", $expire)) { 112 return true; 113 } 114 } 115 return false; 116 } 117 118 /** 119 * 判斷當前是否擁有指定名字的所 120 * @param [type] $name [description] 121 * @return boolean [description] 122 */ 123 public function isLocking($name) { 124 //先看lonkedName[$name]是否存在該鎖標誌名 125 if (isset($this->lockedNames[$name])) { 126 //從redis返回該鎖的生存時間 127 return (string)$this->lockedNames[$name] = (string)$this->redisString->get("Lock:$name"); 128 } 129 130 return false; 131 } 132 133 }
(3)這個隊列和普通隊列不同,入隊時的id是用來區分重複入隊的,隊列裏面只會有一條記錄,同一個id後入的覆蓋前入的,而不是追加, 若是需求要求重複入隊當作不用的任務,請使用不一樣的id區分
1 /** 2 * 入隊一個 Task 3 * @param [type] $name 隊列名稱 4 * @param [type] $id 任務id(或者其數組) 5 * @param integer $timeout 入隊超時時間(秒) 6 * @param integer $afterInterval [description] 7 * @return [type] [description] 8 */ 9 public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) { 10 //合法性檢測 11 if (empty($name) || empty($id) || $timeout <= 0) return false; 12 13 //加鎖 14 if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) { 15 Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); 16 return false; 17 } 18 19 //入隊時以當前時間戳做爲 score 20 $score = microtime(true) + $afterInterval; 21 //入隊 22 foreach ((array)$id as $item) { 23 //先判斷下是否已經存在該id了 24 if (false === $this->_redis->zset->getScore("Queue:$name", $item)) { 25 $this->_redis->zset->add("Queue:$name", $score, $item); 26 } 27 } 28 29 //解鎖 30 $this->_redis->lock->unlock("Queue:$name"); 31 32 return true; 33 34 }
接着來看一下出隊的代碼分析:出隊一個Task,須要指定它的$id 和 $score,若是$score與隊列中的匹配則出隊,不然認爲該Task已被從新入隊過,當前操做按失敗處理。首先和對參數進行合法性檢測,接着又用到加鎖的功能了,而後及時出隊了,先使用getScore()從Redis裏獲取到該id的score,而後將傳入的$score和Redis裏存儲的score進行對比,若是二者相等就進行出隊操做,也就是使用zset裏的delete()方法刪掉該任務id,最後當前就是解鎖了。這就是出隊的代碼分析。
1 /** 2 * 出隊一個Task,須要指定$id 和 $score 3 * 若是$score 與隊列中的匹配則出隊,不然認爲該Task已被從新入隊過,當前操做按失敗處理 4 * 5 * @param [type] $name 隊列名稱 6 * @param [type] $id 任務標識 7 * @param [type] $score 任務對應score,從隊列中獲取任務時會返回一個score,只有$score和隊列中的值匹配時Task纔會被出隊 8 * @param integer $timeout 超時時間(秒) 9 * @return [type] Task是否成功,返回false多是redis操做失敗,也有多是$score與隊列中的值不匹配(這表示該Task自從獲取到本地以後被其餘線程入隊過) 10 */ 11 public function dequeue($name, $id, $score, $timeout = 10) { 12 //合法性檢測 13 if (empty($name) || empty($id) || empty($score)) return false; 14 15 //加鎖 16 if (!$this->_redis->lock->lock("Queue:$name", $timeout)) { 17 Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); 18 return false; 19 } 20 21 //出隊 22 //先取出redis的score 23 $serverScore = $this->_redis->zset->getScore("Queue:$name", $id); 24 $result = false; 25 //先判斷傳進來的score和redis的score是不是同樣 26 if ($serverScore == $score) { 27 //刪掉該$id 28 $result = (float)$this->_redis->zset->delete("Queue:$name", $id); 29 if ($result == false) { 30 Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); 31 } 32 } 33 //解鎖 34 $this->_redis->lock->unlock("Queue:$name"); 35 36 return $result; 37 }
學過數據結構這門課的朋友都應該知道,隊列操做還有彈出頂部某個值的方法等等,這裏處理入隊出隊操做,我還實現了 獲取隊列頂部若干個Task 並將其出隊的方法,想了解的朋友能夠看這段代碼,假如看不太明白就留言,這裏我再也不對其進行分析了。
1 /** 2 * 獲取隊列頂部若干個Task 並將其出隊 3 * @param [type] $name 隊列名稱 4 * @param integer $count 數量 5 * @param integer $timeout 超時時間 6 * @return [type] 返回數組[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] 7 */ 8 public function pop($name, $count = 1, $timeout = 10) { 9 //合法性檢測 10 if (empty($name) || $count <= 0) return []; 11 12 //加鎖 13 if (!$this->_redis->lock->lock("Queue:$name")) { 14 Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); 15 return false; 16 } 17 18 //取出若干的Task 19 $result = []; 20 $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); 21 22 //將其放在$result數組裏 並 刪除掉redis對應的id 23 foreach ($array as $id => $score) { 24 $result[] = ['id'=>$id, 'score'=>$score]; 25 $this->_redis->zset->delete("Queue:$name", $id); 26 } 27 28 //解鎖 29 $this->_redis->lock->unlock("Queue:$name"); 30 31 return $count == 1 ? (empty($result) ? false : $result[0]) : $result; 32 }
1 /** 2 * 任務隊列 3 * 4 */ 5 class RedisQueue { 6 private $_redis; 7 8 public function __construct($param = null) { 9 $this->_redis = RedisFactory::get($param); 10 } 11 12 /** 13 * 入隊一個 Task 14 * @param [type] $name 隊列名稱 15 * @param [type] $id 任務id(或者其數組) 16 * @param integer $timeout 入隊超時時間(秒) 17 * @param integer $afterInterval [description] 18 * @return [type] [description] 19 */ 20 public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) { 21 //合法性檢測 22 if (empty($name) || empty($id) || $timeout <= 0) return false; 23 24 //加鎖 25 if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) { 26 Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id"); 27 return false; 28 } 29 30 //入隊時以當前時間戳做爲 score 31 $score = microtime(true) + $afterInterval; 32 //入隊 33 foreach ((array)$id as $item) { 34 //先判斷下是否已經存在該id了 35 if (false === $this->_redis->zset->getScore("Queue:$name", $item)) { 36 $this->_redis->zset->add("Queue:$name", $score, $item); 37 } 38 } 39 40 //解鎖 41 $this->_redis->lock->unlock("Queue:$name"); 42 43 return true; 44 45 } 46 47 /** 48 * 出隊一個Task,須要指定$id 和 $score 49 * 若是$score 與隊列中的匹配則出隊,不然認爲該Task已被從新入隊過,當前操做按失敗處理 50 * 51 * @param [type] $name 隊列名稱 52 * @param [type] $id 任務標識 53 * @param [type] $score 任務對應score,從隊列中獲取任務時會返回一個score,只有$score和隊列中的值匹配時Task纔會被出隊 54 * @param integer $timeout 超時時間(秒) 55 * @return [type] Task是否成功,返回false多是redis操做失敗,也有多是$score與隊列中的值不匹配(這表示該Task自從獲取到本地以後被其餘線程入隊過) 56 */ 57 public function dequeue($name, $id, $score, $timeout = 10) { 58 //合法性檢測 59 if (empty($name) || empty($id) || empty($score)) return false; 60 61 //加鎖 62 if (!$this->_redis->lock->lock("Queue:$name", $timeout)) { 63 Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id"); 64 return false; 65 } 66 67 //出隊 68 //先取出redis的score 69 $serverScore = $this->_redis->zset->getScore("Queue:$name", $id); 70 $result = false; 71 //先判斷傳進來的score和redis的score是不是同樣 72 if ($serverScore == $score) { 73 //刪掉該$id 74 $result = (float)$this->_redis->zset->delete("Queue:$name", $id); 75 if ($result == false) { 76 Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id"); 77 } 78 } 79 //解鎖 80 $this->_redis->lock->unlock("Queue:$name"); 81 82 return $result; 83 } 84 85 /** 86 * 獲取隊列頂部若干個Task 並將其出隊 87 * @param [type] $name 隊列名稱 88 * @param integer $count 數量 89 * @param integer $timeout 超時時間 90 * @return [type] 返回數組[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] 91 */ 92 public function pop($name, $count = 1, $timeout = 10) { 93 //合法性檢測 94 if (empty($name) || $count <= 0) return []; 95 96 //加鎖 97 if (!$this->_redis->lock->lock("Queue:$name")) { 98 Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count"); 99 return false; 100 } 101 102 //取出若干的Task 103 $result = []; 104 $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); 105 106 //將其放在$result數組裏 並 刪除掉redis對應的id 107 foreach ($array as $id => $score) { 108 $result[] = ['id'=>$id, 'score'=>$score]; 109 $this->_redis->zset->delete("Queue:$name", $id); 110 } 111 112 //解鎖 113 $this->_redis->lock->unlock("Queue:$name"); 114 115 return $count == 1 ? (empty($result) ? false : $result[0]) : $result; 116 } 117 118 /** 119 * 獲取隊列頂部的若干個Task 120 * @param [type] $name 隊列名稱 121 * @param integer $count 數量 122 * @return [type] 返回數組[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]] 123 */ 124 public function top($name, $count = 1) { 125 //合法性檢測 126 if (empty($name) || $count < 1) return []; 127 128 //取錯若干個Task 129 $result = []; 130 $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]); 131 132 //將Task存放在數組裏 133 foreach ($array as $id => $score) { 134 $result[] = ['id'=>$id, 'score'=>$score]; 135 } 136 137 //返回數組 138 return $count == 1 ? (empty($result) ? false : $result[0]) : $result; 139 } 140 }
好了,本次總結和分享到此完畢。最後我附上 分佈式鎖和任務隊列這兩個類:
