分佈式緩存,能解決單臺服務器內存不能無限擴張的瓶頸。在分佈式緩存的應用中,會遇到多個客戶端同時爭用的問題。這個時候,須要用到分佈式鎖,獲得鎖的客戶端纔有操做權限。java
Memcached 和 Redis 是經常使用的分佈式緩存構建方案,下面列舉下基於Memcached 和 Redis 分佈式鎖的實現方法。緩存
Memcached 分佈式鎖服務器
Memcached 能夠使用 add 命令,該命令只有KEY不存在時,才進行添加,或者不會處理。Memcached 全部命令都是原子性的,併發下add 同一個KEY ,只會一個會成功。併發
利用這個原理,能夠先定義一個 鎖 LockKEY ,add 成功的認爲是獲得鎖。而且設置[過時超時] 時間,保證宕機後,也不會死鎖。分佈式
在具體操做完後,判斷是否這次操做已超時。若是超時則不刪除鎖,若是不超時則刪除鎖。code
僞代碼:內存
1 if (mc.Add("LockKey", "Value", expiredtime)) 2 { 3 //獲得鎖 4 try 5 { 6 //do business function 7 8 //檢查超時 9 if (!CheckedTimeOut()) 10 { 11 mc.Delete("LockKey"); 12 } 13 } 14 catch (Exception e) 15 { 16 mc.Delete("LockKey"); 17 } 18 19 }
Redis 分佈式鎖get
Redis 沒有add 命令,但有SETNX(SET if Not eXists)若給定的 key 已經存在,則 SETNX不作任何動做。設置成功,返回 1 。設置失敗,返回 0 。string
SETNX 命令不能設置過時時間,須要再使用 EXPIRE 命令設置過時時間。io
僞代碼:
int lockResult = rd.SETNX("LockKey", "Value"); if (lockResult == 1) { //[1]獲得鎖 //[2]設置超時過時時間 rd.EXPIRE("LockKey", expiredtime); try { //do business function //檢查超時 if (!CheckedTimeOut()) { rd.DEL("LockKey"); } } catch (Exception e) { rd.DEL("LockKey"); } }
這種作法,有一個很大的潛在風險。[1]獲得鎖後,再執行[2] 設置過時時間。若是在這期間出現宕機,則會致使沒有設置過時時間。按Redis 的默認緩存過時策略,這個鎖將不會釋放,產生死鎖。
因此不推薦用這種作法,應該用其它方式來實現鎖的超時過時策略:
1:SETNX value 值=當前時間+過時超時時間,返回1 則得到鎖,返回0則沒有得到鎖。轉2。
2:GET 獲取 value 的值 。判斷鎖是否過時超時。若是超時,轉3。
3:GETSET(將給定 key 的值設爲 value ,並返回 key 的舊值),GETSET value 值=當前時間+過時超時時間, 判斷獲得的value 若是仍然是超時的,那就說明獲得鎖,不然沒有獲得鎖。
從2併發進到3 的操做,會屢次改寫超時時間,但這個不會有什麼影響。
僞代碼:
string expiredtime = DateTime.Now.AddMinutes(LockTimeoutMinutes).ToString(); int lockResult = rd.SETNX("LockKey", expiredtime); bool getLock = false; if (lockResult == 1) { //獲得鎖 getLock = true; } else { string curExpiredtime = rd.GET("LockKey"); //檢查鎖超時 if (CheckedLockTimeOut(expiredtime)) { expiredtime = DateTime.Now.AddMinutes(LockTimeoutMinutes).ToString(); string newExpiredTime = GETSET(expiredtime); if (CheckedLockTimeOut(newExpiredTime)) { //獲得鎖 getLock = true; } } } if (getLock) { try { //do business function //檢查超時 if (!CheckedTimeOut()) { rd.DEL("LockKey"); } } catch (Exception e) { rd.DEL("LockKey"); } }