在分佈式系統中,分佈式鎖是一個很常見的技術。即有不少個進程同時訪問同一個共享資源沒有同步訪問,資源的載體多是傳統關係型數據庫或者NoSQL。html
若是是在單機環境中,能夠使用ReentrantLock或者synchronized代碼塊來實現,然而這些在分佈式環境下卻不能知足要求。java
例若有這樣的一個場景:git
多臺服務器同時從MQ消息隊列接消息,接到消息後的處理邏輯是:根據消息中的業務id去查詢數據庫,若是數據庫中不存在,則插入,反之則更新。github
這樣就會出現這樣一個問題:服務器A和服務器B中接收到的消息中業務id是同樣的,2臺服務器同時到數據庫中查詢,都不存在,則都會進行插入操做,最後的結果是表中有2條同樣的記錄(或者有主鍵約束的話報主建不惟一異常插入失敗),顯然這不是我能所指望的!redis
分佈式環境中多臺服務器至關於多個獨立的jvm環境,有多個進程(注意不是線程)對同一資源進行操做,jdk中用做同步的Lock或者synchronized就會失效。數據庫
這裏就須要用到分佈式鎖,目前實現分佈式鎖的主要技術大概有:json
基於Redis實現(或者其餘緩存memcached,tair),主要基於redis的setnx(set if not exist)命令;緩存
基於Zookeeper實現;服務器
基於數據庫表version字段實現,樂觀鎖,兩個線程能夠同時讀取到原有的version值,可是最終只有一個能夠完成操做;數據結構
下面是redis實現分佈式鎖的方法,主要用到了Redisson,它是redis官方推薦的分佈式java客戶端,和Jedis相比它實現了分佈式和可擴展的java數據結構,
這裏有一篇對比文章:Jedis與Redisson選型對比
Redisson的介紹能夠到:https://github.com/redisson/redisson/wiki/1.-%E6%A6%82%E8%BF%B0 這裏去了解!
具體實現代碼:
import org.redisson.Config; import org.redisson.Redisson; import org.redisson.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author created by dehong * @date 2018年3月4日 下午4:12:42 * */ public class RedissonManager { private static final Logger logger = LoggerFactory.getLogger(RedissonManager.class); private static RedissonClient redisson; //private static Config config; /** * 經過配置文件初始化 */ static{ logger.info("init redisson ..."); Config config = null; try { config = Config.fromYAML(RedissonManager.class.getClassLoader().getResourceAsStream("redisson.yaml")); //config = Config.fromJSON(RedissonManager.class.getClassLoader().getResourceAsStream("redisson.json")); } catch (Exception e) { logger.error("RedissonClient init failed !", e); } redisson = Redisson.create(config); } /** * 獲取Redisson的實例對象 * @return */ public static Redisson getRedisson(){ if(redisson == null){ logger.info("RedissonClient init failed !"); return null; } return (Redisson) redisson; } /** * 測試Redisson是否正常 * @param args */ public static void main(String[] args) { Redisson redisson = RedissonManager.getRedisson(); System.out.println("redisson = " + redisson); } }
上面的RedissonClient初始化是用配置文件yaml的形式,也能夠用json,能夠去參考官方文檔 https://github.com/redisson/redisson
yaml示例(單機redis節點模式)
--- singleServerConfig: idleConnectionTimeout: 10000 pingTimeout: 1000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 reconnectionTimeout: 3000 failedAttempts: 3 password: null subscriptionsPerConnection: 5 clientName: null address: - redis://192.168.43.84:6379 subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 connectionMinimumIdleSize: 32 connectionPoolSize: 64 database: 0 dnsMonitoring: false dnsMonitoringInterval: 5000
工具類:
import java.util.concurrent.TimeUnit; import org.redisson.Redisson; import org.redisson.core.RLock; /** * * @author created by dehong * @date 2018年3月4日 下午4:30:11 * Redisson分佈式鎖 工具類 */ public class RedissonLockUtil { private static Redisson redisson = RedissonManager.getRedisson(); private static final String LOCK_FLAG = "redissonlock_"; /** * 根據name對進行上鎖操做,redissonLock 阻塞事的,採用的機制發佈/訂閱 * @param key */ public static void lock(String key){ String lockKey = LOCK_FLAG + key; RLock lock = redisson.getLock(lockKey); //lock提供帶timeout參數,timeout結束強制解鎖,防止死鎖 :1分鐘 lock.lock(1, TimeUnit.MINUTES); } /** * 根據name對進行解鎖操做 * @param key */ public static void unlock(String key){ String lockKey = LOCK_FLAG + key; RLock lock = redisson.getLock(lockKey); //若是鎖被當前線程持有,則釋放 if(lock.isHeldByCurrentThread()){ lock.unlock(); } } }
注意:上面的unlock()中不加isHeldByCurrentThread()條件的話,在執行的task的時間超過timeout時,此時若是unlock,其實redisson已經主動unlock了,就會出現IllegalMonitorStateException 異常
調用:
try { //獲取鎖,這裏的key能夠上面場景中的業務id RedissonLockUtil.lock(key); //具體要執行的代碼.... } catch (Exception e) { e.printStackTrace(); }finally { //釋放鎖 RedissonLockUtil.unlock(key); }