<!-- more -->html
轉載請務必註明原創地址爲:http://www.54tianzhisheng.cn/2018/04/24/Distributed_lock/java
分佈式的 CAP 理論告訴咱們:node
任何一個分佈式系統都沒法同時知足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance),最多隻能同時知足兩項。mysql
目前不少大型網站及應用都是分佈式部署的,分佈式場景中的數據一致性問題一直是一個比較重要的話題。基於 CAP理論,不少系統在設計之初就要對這三者作出取捨。在互聯網領域的絕大多數的場景中,都須要犧牲強一致性來換取系統的高可用性,系統每每只須要保證最終一致性。linux
此處主要指集羣模式下,多個相同服務同時開啓.git
在許多的場景中,咱們爲了保證數據的最終一致性,須要不少的技術方案來支持,好比分佈式事務、分佈式鎖等。不少時候咱們須要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,經過 Java 提供的併發 API 咱們能夠解決,可是在分佈式環境下,就沒有那麼簡單啦。github
基於樂觀鎖redis
利用主鍵惟一的特性,若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲操做成功的那個線程得到了該方法的鎖,當方法執行完畢以後,想要釋放鎖的話,刪除這條數據庫記錄便可。算法
上面這種簡單的實現有如下幾個問題:spring
固然,咱們也能夠有其餘方式解決上面的問題。
這個策略源於 mysql 的 mvcc 機制,使用這個策略其實自己沒有什麼問題,惟一的問題就是對數據表侵入較大,咱們要爲每一個表設計一個版本號字段,而後寫一條判斷 sql 每次進行判斷,增長了數據庫操做的次數,在高併發的要求下,對數據庫鏈接的開銷也是沒法忍受的。
基於悲觀鎖
在查詢語句後面增長for update
,數據庫會在查詢過程當中給數據庫表增長排他鎖 (注意: InnoDB 引擎在加鎖的時候,只有經過索引進行檢索的時候纔會使用行級鎖,不然會使用表級鎖。這裏咱們但願使用行級鎖,就要給要執行的方法字段名添加索引,值得注意的是,這個索引必定要建立成惟一索引,不然會出現多個重載方法之間沒法同時被訪問的問題。重載方法的話建議把參數類型也加上。)。當某條記錄被加上排他鎖以後,其餘線程沒法再在該行記錄上增長排他鎖。
咱們能夠認爲得到排他鎖的線程便可得到分佈式鎖,當獲取到鎖以後,能夠執行方法的業務邏輯,執行完方法以後,經過connection.commit()
操做來釋放鎖。
這種方法能夠有效的解決上面提到的沒法釋放鎖和阻塞鎖的問題。
for update
語句會在執行成功後當即返回,在執行失敗時一直處於阻塞狀態,直到成功。可是仍是沒法直接解決數據庫單點和可重入問題。
這裏還可能存在另一個問題,雖然咱們對方法字段名使用了惟一索引,而且顯示使用 for update 來使用行級鎖。可是,MySQL 會對查詢進行優化,即使在條件中使用了索引字段,可是否使用索引來檢索數據是由 MySQL 經過判斷不一樣執行計劃的代價來決定的,若是 MySQL 認爲全表掃效率更高,好比對一些很小的表,它就不會使用索引,這種狀況下 InnoDB 將使用表鎖,而不是行鎖。若是發生這種狀況就悲劇了。。。
還有一個問題,就是咱們要使用排他鎖來進行分佈式鎖的 lock,那麼一個排他鎖長時間不提交,就會佔用數據庫鏈接。一旦相似的鏈接變得多了,就可能把數據庫鏈接池撐爆。
優勢:簡單,易於理解
缺點:會有各類各樣的問題(操做數據庫須要必定的開銷,使用數據庫的行級鎖並不必定靠譜,性能不靠譜)
setnx 的含義就是 SET if Not Exists,其主要有兩個參數 setnx(key, value)。該方法是原子的,若是 key 不存在,則設置當前 key 成功,返回 1;若是當前 key 已經存在,則設置當前 key 失敗,返回 0。
expire 設置過時時間,要注意的是 setnx 命令不能設置 key 的超時時間,只能經過 expire() 來對 key 設置。
一、setnx(lockkey, 1) 若是返回 0,則說明佔位失敗;若是返回 1,則說明佔位成功
二、expire() 命令對 lockkey 設置超時時間,爲的是避免死鎖問題。
三、執行完業務代碼後,能夠經過 delete 命令刪除 key。
這個方案實際上是能夠解決平常工做中的需求的,但從技術方案的探討上來講,可能還有一些能夠完善的地方。好比,若是在第一步 setnx 執行成功後,在 expire() 命令執行成功前,發生了宕機的現象,那麼就依然會出現死鎖的問題,因此若是要對其進行完善的話,可使用 redis 的 setnx()、get() 和 getset() 方法來實現分佈式鎖。
這個方案的背景主要是在 setnx() 和 expire() 的方案上針對可能存在的死鎖問題,作了一些優化。
這個命令主要有兩個參數 getset(key,newValue)。該方法是原子的,對 key 設置 newValue 這個值,而且返回 key 原來的舊值。假設 key 原來是不存在的,那麼屢次執行這個命令,會出現下邊的效果:
import cn.com.tpig.cache.redis.RedisService; import cn.com.tpig.utils.SpringUtils; //redis分佈式鎖 public final class RedisLockUtil { private static final int defaultExpire = 60; private RedisLockUtil() { // } /** * 加鎖 * @param key redis key * @param expire 過時時間,單位秒 * @return true:加鎖成功,false,加鎖失敗 */ public static boolean lock(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1"); if(status == 1) { redisService.expire(key, expire); return true; } return false; } public static boolean lock(String key) { return lock2(key, defaultExpire); } /** * 加鎖 * @param key redis key * @param expire 過時時間,單位秒 * @return true:加鎖成功,false,加鎖失敗 */ public static boolean lock2(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); long value = System.currentTimeMillis() + expire; long status = redisService.setnx(key, String.valueOf(value)); if(status == 1) { return true; } long oldExpireTime = Long.parseLong(redisService.get(key, "0")); if(oldExpireTime < System.currentTimeMillis()) { //超時 long newExpireTime = System.currentTimeMillis() + expire; long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime))); if(currentExpireTime == oldExpireTime) { return true; } } return false; } public static void unLock1(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); redisService.del(key); } public static void unLock2(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); long oldExpireTime = Long.parseLong(redisService.get(key, "0")); if(oldExpireTime > System.currentTimeMillis()) { redisService.del(key); } } }
public void drawRedPacket(long userId) { String key = "draw.redpacket.userid:" + userId; boolean lock = RedisLockUtil.lock2(key, 60); if(lock) { try { //領取操做 } finally { //釋放鎖 RedisLockUtil.unLock(key); } } else { new RuntimeException("重複領取獎勵"); } }
Redlock 是 Redis 的做者 antirez 給出的集羣模式的 Redis 分佈式鎖,它基於 N 個徹底獨立的 Redis 節點(一般狀況下 N 能夠設置成 5)。
算法的步驟以下:
可是,有一位分佈式的專家寫了一篇文章《How to do distributed locking》,質疑 Redlock 的正確性。
https://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA
https://blog.csdn.net/jek123456/article/details/72954106
優勢:
性能高
缺點:
失效時間設置多長時間爲好?如何設置的失效時間過短,方法沒等執行完,鎖就自動釋放了,那麼就會產生併發問題。若是設置的時間太長,其餘獲取鎖的線程就可能要平白的多等一段時間。
redisson 是 redis 官方的分佈式鎖組件。GitHub 地址:https://github.com/redisson/redisson
上面的這個問題 ——> 失效時間設置多長時間爲好?這個問題在 redisson 的作法是:每得到一個鎖時,只設置一個很短的超時時間,同時起一個線程在每次快要到超時時間時去刷新鎖的超時時間。在釋放鎖的同時結束這個線程。
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//競爭資源的標誌 private String waitNode;//等待前一個鎖 private String myZnode;//當前鎖 private CountDownLatch latch;//計數器 private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>(); /** * 建立分佈式鎖,使用前請確認config配置的zookeeper服務可用 * @param config 127.0.0.1:2181 * @param lockName 競爭資源標誌,lockName中不能包含單詞lock */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // 建立一個與服務器的鏈接 try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 建立根節點 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper節點的監視器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);//等待鎖 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //建立臨時子節點 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); //取出全部子節點 List<String> subNodes = zk.getChildren(root, false); //取出全部lockName的鎖 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //若是是最小的節點,則表示取得鎖 return true; } //若是不是最小的節點,找到比本身小1的節點 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
優勢:
有效的解決單點問題,不可重入問題,非阻塞問題以及鎖沒法釋放的問題。實現起來較爲簡單。
缺點:
性能上可能並無緩存服務那麼高,由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬臨時節點來實現鎖功能。ZK 中建立和刪除節點只能經過 Leader 服務器來執行,而後將數據同步到全部的 Follower 機器上。還須要對 ZK的原理有所瞭解。
DD 寫過相似文章,其實主要利用 Consul 的 Key / Value 存儲 API 中的 acquire 和 release 操做來實現。
文章地址:http://blog.didispace.com/spring-cloud-consul-lock-and-semphore/
一、注意分佈式鎖的開銷
二、注意加鎖的粒度
三、加鎖的方式
不管你身處一個什麼樣的公司,最開始的工做可能都須要從最簡單的作起。不要提阿里和騰訊的業務場景 qps 如何大,由於在這樣的大場景中你未必能親自參與項目,親自參與項目未必能是核心的設計者,是核心的設計者未必能獨自設計。但願你們能根據本身公司業務場景,選擇適合本身項目的方案。
http://www.hollischuang.com/archives/1716
http://www.spring4all.com/question/158
https://www.cnblogs.com/PurpleDream/p/5559352.html