在分佈式系統的不少場景中,咱們爲了保證數據的最終一致性,須要不少的技術方案來支持,好比分佈式事務、分佈式鎖等。redis
有的時候,咱們須要保證一個方法在同一時間內只能被同一個線程執行。在單機環境中,Java中其實提供了不少併發處理相關的API,可是這些API在分佈式場景中就無能爲力了。也就是說單純的Java Api並不能提供分佈式鎖的能力。數據庫
目前針對分佈式鎖的實現目前有多種方案:api
在分析這幾種實現方案以前咱們先來想一下,咱們須要的分佈式鎖應該是怎麼樣的?(這裏以方法鎖爲例,資源鎖同理)緩存
能夠保證在分佈式部署的應用集羣中,同一個方法在同一時間只能被一臺機器上的一個線程執行。bash
要實現分佈式鎖,最簡單的方式可能就是直接建立一張鎖表,而後經過操做該表中的數據來實現了。服務器
當咱們要鎖住某個方法或資源時,咱們就在該表中增長一條記錄,想要釋放鎖的時候就刪除這條記錄。併發
建立這樣一張數據庫表:分佈式
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '備註信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT '保存數據時間,自動生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的方法';
複製代碼
當咱們想要鎖住某個方法時,執行如下SQL:memcached
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
複製代碼
由於咱們對method_name作了惟一性約束,這裏若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲操做成功的那個線程得到了該方法的鎖,能夠執行方法體內容。工具
當方法執行完畢以後,想要釋放鎖的話,須要執行如下Sql:
delete from methodLock where method_name ='method_name'
複製代碼
上面這種簡單的實現有如下幾個問題:
這把鎖強依賴數據庫的可用性,數據庫是一個單點,一旦數據庫掛掉,會致使業務系統不可用。
這把鎖沒有失效時間,一旦解鎖操做失敗,就會致使鎖記錄一直在數據庫中,其餘線程沒法再得到到鎖。
這把鎖只能是非阻塞的,由於數據的insert操做,一旦插入失敗就會直接報錯。沒有得到鎖的線程並不會進入排隊隊列,要想再次得到鎖就要再次觸發得到鎖操做。
這把鎖是非重入的,同一個線程在沒有釋放鎖以前沒法再次得到該鎖。由於數據中數據已經存在了。
固然,咱們也能夠有其餘方式解決上面的問題。
針對 數據庫是單點問題搞兩個數據庫,數據以前雙向同步。一旦掛掉快速切換到備庫上。
針對 沒有失效時間?只要作一個定時任務,每隔必定時間把數據庫中的超時數據清理一遍。
針對 非阻塞的?搞一個while循環,直到insert成功再返回成功。
針對 非重入的?在數據庫表中加個字段,記錄當前得到鎖的機器的主機信息和線程信息,那麼下次再獲取鎖的時候先查詢數據庫,若是當前機器的主機信息和線程信息在數據庫能夠查到的話,直接把鎖分配給他就能夠了。
除了能夠經過增刪操做數據表中的記錄之外,其實還能夠藉助數據中自帶的鎖來實現分佈式的鎖。
咱們還用剛剛建立的那張數據庫表。能夠經過數據庫的排他鎖來實現分佈式鎖。 基於MySql的InnoDB引擎,可使用如下方法來實現加鎖操做:
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name=xxx
for update;
if(result==null){
return true;
}
}catch(Exception e){
}
sleep(1000);
}
return false;
}
複製代碼
在查詢語句後面增長for update,數據庫會在查詢過程當中給數據庫表增長排他鎖。當某條記錄被加上排他鎖以後,其餘線程沒法再在該行記錄上增長排他鎖。
咱們能夠認爲得到排它鎖的線程便可得到分佈式鎖,當獲取到鎖以後,能夠執行方法的業務邏輯,執行完方法以後,再經過如下方法解鎖:
public void unlock(){
connection.commit();
}
複製代碼
經過connection.commit()操做來釋放鎖。
這種方法能夠有效的解決上面提到的沒法釋放鎖和阻塞鎖的問題。
阻塞鎖? for update語句會在執行成功後當即返回,在執行失敗時一直處於阻塞狀態,直到成功。
鎖定以後 服務宕機,沒法釋放?使用這種方式,服務宕機以後數據庫會本身把鎖釋放掉。 可是仍是沒法直接解決數據庫單點和可重入問題。
總結一下使用數據庫來實現分佈式鎖的方式,這兩種方式都是依賴數據庫的一張表,一種是經過表中的記錄的存在狀況肯定當前是否有鎖存在,另一種是經過數據庫的排他鎖來實現分佈式鎖。
數據庫實現分佈式鎖的 優勢: 直接藉助數據庫,容易理解。
數據庫實現分佈式鎖的 缺點: 會有各類各樣的問題,在解決問題的過程當中會使整個方案變得愈來愈複雜。
操做數據庫須要必定的開銷,性能問題須要考慮。
相比較於基於數據庫實現分佈式鎖的方案來講,基於緩存來實如今性能方面會表現的更好一點。並且不少緩存是能夠集羣部署的,能夠解決單點問題。
目前有不少成熟的緩存產品,包括Redis,memcached等。
在實現的時候要注意的幾個關鍵點:
鎖信息必須是會過時超時的,不能讓一個線程長期佔有一個鎖而致使死鎖;
同一時刻只能有一個線程獲取到鎖。
幾個要用到的redis命令:
setnx(key, value):「set if not exits」,若該key-value不存在,則成功加入緩存而且返回1,不然返回0。
get(key):得到key對應的value值,若不存在則返回nil。
getset(key, value):先獲取key對應的value值,若不存在則返回nil,而後將舊的value更新爲新的value。
expire(key, seconds):設置key-value的有效期爲seconds秒。
看一下流程圖:
在這個流程下,不會致使死鎖。
我採用Jedis做爲Redis客戶端的api,下面來看一下具體實現的代碼。
public class RedisPool {
private static JedisPool pool;//jedis鏈接池
private static int maxTotal = 20;//最大鏈接數
private static int maxIdle = 10;//最大空閒鏈接數
private static int minIdle = 5;//最小空閒鏈接數
private static boolean testOnBorrow = true;//在取鏈接時測試鏈接的可用性
private static boolean testOnReturn = false;//再還鏈接時不測試鏈接的可用性
static {
initPool();//初始化鏈接池
}
public static Jedis getJedis(){
return pool.getResource();
}
public static void close(Jedis jedis){
jedis.close();
}
private static void initPool(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setBlockWhenExhausted(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 5000, "liqiyao");
}
}
複製代碼
public class RedisPoolUtil {
private RedisPoolUtil(){}
private static RedisPool redisPool;
public static String get(String key){
Jedis jedis = null;
String result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.get(key);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long setnx(String key, String value){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.setnx(key, value);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static String getSet(String key, String value){
Jedis jedis = null;
String result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.getSet(key, value);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long expire(String key, int seconds){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.expire(key, seconds);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long del(String key){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.del(key);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
}
複製代碼
public class DistributedLockUtil {
private DistributedLockUtil(){
}
public static boolean lock(String lockName){//lockName能夠爲共享變量
名,也能夠爲方法名,主要是用於模擬鎖信息
System.out.println(Thread.currentThread() + "開始嘗試加鎖!");
Long result = RedisPoolUtil.setnx
(lockName, String.valueOf(System.currentTimeMillis() + 5000));
if (result != null && result.intValue() == 1){
System.out.println(Thread.currentThread() + "加鎖成功!");
RedisPoolUtil.expire(lockName, 5);
System.out.println(Thread.currentThread() + "執行業務邏輯!");
RedisPoolUtil.del(lockName);
return true;
} else {
String lockValueA = RedisPoolUtil.get(lockName);
if (lockValueA != null && Long.parseLong(lockValueA) >=
System.currentTimeMillis()){
String lockValueB = RedisPoolUtil.getSet(lockName,
String.valueOf(System.currentTimeMillis() + 5000));
if (lockValueB == null || lockValueB.equals(lockValueA)){
System.out.println(Thread.currentThread() + "加鎖成功!");
RedisPoolUtil.expire(lockName, 5);
System.out.println(Thread.currentThread() + "執行業務邏輯!");
RedisPoolUtil.del(lockName);
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
複製代碼
基於zookeeper臨時有序節點能夠實現的分佈式鎖。大體思想即爲:每一個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個惟一的
瞬時有序節點。 判斷是否獲取鎖的方式很簡單,只須要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節點刪除便可。同時,其能夠避免服務宕機致使的鎖沒法釋放,而產生的死鎖問題。
鎖沒法釋放? 使用Zookeeper能夠有效的解決鎖沒法釋放的問題,由於在建立鎖的時候,客戶端會在ZK中建立一個臨時節點,一旦客戶端獲取到鎖以後忽然掛掉(Session鏈接斷開),那麼這個臨時節點就會自動刪除掉。其餘客戶端就能夠再次得到鎖。
非阻塞鎖? 使用Zookeeper能夠實現阻塞的鎖,客戶端能夠經過在ZK中建立順序節點,而且在節點上綁定監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端能夠檢查本身建立的節點是否是當前全部節點中序號最小的,若是是,那麼本身就獲取到鎖,即可以執行業務邏輯了。
不可重入? 使用Zookeeper也能夠有效的解決不可重入的問題,客戶端在建立節點的時候,把當前客戶端的主機信息和線程信息直接寫入到節點中,下次想要獲取鎖的時候和當前最小的節點中的數據比對一下就能夠了。若是和本身的信息同樣,那麼本身直接獲取到鎖,若是不同就再建立一個臨時的順序節點,參與排隊。
單點問題? 使用Zookeeper能夠有效的解決單點問題,ZK是集羣部署的,只要集羣中有半數以上的機器存活,就能夠對外提供服務。 能夠直接使用zookeeper第三方庫Curator客戶端,這個客戶端中封裝了一個可重入的鎖服務。
public boolean tryLock(long timeout, TimeUnit unit) throws
InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public boolean unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path),
delayTimeForClean, TimeUnit.MILLISECONDS);
}
return true;
}
複製代碼
Curator提供的InterProcessMutex是分佈式鎖的實現。acquire方法用戶獲取鎖,release方法用於釋放鎖。
使用ZK實現的分佈式鎖好像徹底符合了本文開頭咱們對一個分佈式鎖的全部指望。可是,其實並非,Zookeeper實現的分佈式鎖其實存在一個缺點,那就是性能上可能並無緩存服務 那麼高。由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬瞬時節點來實現鎖功能。ZK中建立和刪除節點只能經過Leader服務器來執行,而後將數據同不到全部的Follower機器上。
使用Zookeeper實現分佈式鎖的優勢: 有效的解決單點問題,不可重入問題,非阻塞問題以及鎖沒法釋放的問題。實現起來較爲簡單。
使用Zookeeper實現分佈式鎖的缺點 : 性能上不如使用緩存實現分佈式鎖。 須要對ZK的原理有所瞭解。
從理解的難易程度角度(從低到高): 數據庫 > 緩存 > Zookeeper
從實現的複雜性角度(從低到高): Zookeeper >= 緩存 > 數據庫
從性能角度(從高到低): 緩存 > Zookeeper >= 數據庫
從可靠性角度(從高到低): Zookeeper > 緩存 > 數據庫
所以我我的更加傾向於使用緩存來實現,後續的文章中會基於Redis封裝一個咱們本身的分佈式鎖實現。