幾種分佈式鎖的實現方式

在分佈式系統中,各系統同步訪問共同的資源是很常見的。所以咱們經常須要協調他們的動做。 若是不一樣的系統或是同一個系統的不一樣主機之間共享了一個或一組資源,那麼訪問這些資源的時候,每每須要互斥來防止彼此干擾來保證一致性,在這種狀況下,便須要使用到分佈式鎖。java

一個好的分佈式鎖經常須要如下特性:redis

  • 可重入
  • 同一時間點,只有一個線程持有鎖
  • 容錯性, 當鎖節點宕機時, 能及時釋放鎖
  • 高性能
  • 無單點問題

一. 基於數據庫的分佈式鎖

分佈式鎖流程圖-基於數據庫分佈式鎖.jpg
基於數據庫的分佈式鎖, 經常使用的一種方式是使用表的惟一約束特性。當往數據庫中成功插入一條數據時, 表明只獲取到鎖。將這條數據從數據庫中刪除,則釋放送。

所以須要建立一張鎖表sql

CREATE TABLE `methodLock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '鎖定的方法名',
  `cust_id` varchar(1024) NOT NULL DEFAULT '客戶端惟一編碼',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存數據時間,自動生成',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
)
 ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='鎖定中的方法';
複製代碼

添加鎖數據庫

insert into methodLock(method_name,cust_id) values (‘method_name’,‘cust_id’)
複製代碼

這裏cust_id 能夠是機器的mac地址+線程編號, 確保一個線程只有惟一的一個編號。經過這個編號, 能夠有效的判斷是否爲鎖的建立者,從而進行鎖的釋放以及重入鎖判斷緩存

釋放鎖分佈式

delete from methodLock where method_name ='method_name' and cust_id = 'cust_id'
複製代碼

重入鎖判斷memcached

select 1 from methodLock where method_name ='method_name' and cust_id = 'cust_id'
複製代碼

加鎖以及釋放鎖的代碼示例性能

/** * 獲取鎖 */
public boolean lock(String methodName){
    boolean success = false;
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    try{
        //添加鎖
       success = insertLock(methodName, custId);
    } catch(Exception e) {
        //如添加失敗
    }
    return success;
}

/** * 釋放鎖 */
public boolean unlock(String methodName) {
    boolean success = false;
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    try{
        //添加鎖
       success = deleteLock(methodName, custId);
    } catch(Exception e) {
        //如添加失敗
    }
    return success;
}
複製代碼

完整流程ui

public void test() {
    String methodName = "methodName";
    //判斷是否重入鎖
    if (!checkReentrantLock(methodName)) {
        //非重入鎖
        while (!lock(methodName)) {
            //獲取鎖失敗, 則阻塞至獲取鎖
            try{
                Thread.sleep(100)
            } catch(Exception e) {
            }
        }
    }
    //TODO 業務處理
    
    //釋放鎖
    unlock(methodName);
}
複製代碼

以上代碼還存在一些問題:編碼

  • 沒有失效時間。 解決方案:設置一個定時處理, 按期清理過時鎖
  • 單點問題。 解決方案: 弄幾個備份數據庫,數據庫以前雙向同步,一旦掛掉快速切換到備庫上

二. 基於redis的分佈式鎖

分佈式鎖流程圖-基於redis分佈式鎖.jpg
使用redis 的set(String key, String value, String nxxx, String expx, int time)命令

  • 第一個爲key,咱們使用key來當鎖,由於key是惟一的。
  • 第二個爲value,咱們傳的是custId,這裏cust_id 能夠是機器的mac地址+線程編號, 確保一個線程只有惟一的一個編號。經過這個編號, 能夠有效的判斷是否爲鎖的建立者,從而進行鎖的釋放以及重入鎖判斷
  • 第三個爲nxxx,這個參數咱們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做
  • 第四個爲expx,這個參數咱們傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定。
  • 第五個爲time,與第四個參數相呼應,表明key的過時時間。

代碼示例

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;

// Redis客戶端
private Jedis jedis;

/** * 嘗試獲取分佈式鎖 * @param lockKey 鎖 * @param expireTime 超期時間 * @return 是否獲取成功 */
public boolean lock(String lockKey, int expireTime) {
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    String result = jedis.set(lockKey, custId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    
    return false;
}

/** * 釋放分佈式鎖 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */
public boolean unlock(String lockKey,) {
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(custId));

    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}

/** * 獲取鎖信息 * @param lockKey 鎖 * @return 是否重入鎖 */
public boolean checkReentrantLock(String lockKey){
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    
    //獲取當前鎖的客戶惟一表示碼
    String currentCustId = redis.get(lockKey);
    if (custId.equals(currentCustId)) {
        return true;
    }
    return false;
}
複製代碼

完整流程

public void test() {
    String lockKey = "lockKey";
    //判斷是否重入鎖
    if (!checkReentrantLock(lockKey)) {
        //非重入鎖
        while (!lock(lockKey)) {
            //獲取鎖失敗, 則阻塞至獲取鎖
            try{
                Thread.sleep(100)
            } catch(Exception e) {
            }
        }
    }
    //TODO 業務處理
    
    //釋放鎖
    unlock(lockKey);
}
複製代碼

三. 基於memcached的分佈式鎖

分佈式鎖流程圖-基於memcached分佈式鎖.jpg

memcached的實現方式和redis相似, 使用的是命令add(key, value, expireDate),注:僅當緩存中不存在鍵時,纔會添加成功

  • 第一個爲key,咱們使用key來當鎖,由於key是惟一的。
  • 第二個爲value,咱們傳的是custId,這裏cust_id
  • 第三個爲expireDate, 設置一個過時時間,好比: new Date(1000*10),則表示十秒以後從Memcached內存緩存中刪除)。

代碼示例

// Redis客戶端
private MemCachedClient memCachedClient;

/** * 嘗試獲取分佈式鎖 * @param lockKey 鎖 * @param expireTime 超期時間 * @return 是否獲取成功 */
public boolean lock(String lockKey, Date expireDate) {
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    Boolean result = false;
    try {
        result = memCachedClient.add(lockKey, custId,expireDate);
    } catch(Excetion e) {
    }
    return result;
}

/** * 釋放分佈式鎖 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */
public boolean unlock(String lockKey,) {
    //獲取客戶惟一識別碼,例如:mac+線程信息
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    Boolean result = false;
    try {
        String currentCustId = memCachedClient.get(lockKey);
        if (custId.equals(currentCustId)) {
            result = memCachedClient.delete(lockKey, custId,expireDate);
        }
    } catch(Excetion e) {
    }
    return result;
}

/** * 獲取鎖信息 * @param lockKey 鎖 * @return 是否重入鎖 */
public boolean checkReentrantLock(String lockKey){
    //獲取客戶惟一識別碼,例如:mac+線程信息
    String custId = getCustId();
    //獲取當前鎖的客戶惟一表示碼
    try {
         String currentCustId = memCachedClient.get(lockKey);
        if (custId.equals(currentCustId)) {
            return true;
        }
    } catch(Excetion e) {
    }
   
    return false;
}
複製代碼

完整流程

public void test() {
    String lockKey = "lockKey";
    //判斷是否重入鎖
    if (!checkReentrantLock(lockKey)) {
        //非重入鎖
        while (!lock(lockKey)) {
            //獲取鎖失敗, 則阻塞至獲取鎖
            try{
                Thread.sleep(100)
            } catch(Exception e) {
            }
        }
    }
    //TODO 業務處理
    
    //釋放鎖
    unlock(lockKey);
}
複製代碼

四. 基於zookeeper的分佈式鎖

分佈式鎖流程圖-基於zookeeper分佈式鎖.jpg

基於zookeeper臨時有序節點能夠實現的分佈式鎖。 大體思想即爲:每一個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節點的目錄下,生成一個惟一的瞬時有序節點。 判斷是否獲取鎖的方式很簡單,只須要判斷有序節點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節點刪除便可。同時,其能夠避免服務宕機致使的鎖沒法釋放,而產生的死鎖問題。

能夠直接使用zookeeper第三方庫Curator客戶端,這個客戶端中封裝了一個可重入的鎖服務。

完整流程

public void test() {
    //Curator提供的InterProcessMutex是分佈式鎖的實現。經過acquire得到鎖,並提供超時機制,release方法用於釋放鎖。
    InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
    try {
        //獲取鎖
        if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
            //TODO 業務處理
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            //釋放鎖
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
複製代碼

歡迎長按下圖關注公衆號: 終身幼稚園

相關文章
相關標籤/搜索