redis鎖類型及簡單實現

1. redis加鎖分類

  1. redis能用的的加鎖命令分表是INCRSETNXSET

2. 第一種鎖命令INCR

這種加鎖的思路是, key 不存在,那麼 key 的值會先被初始化爲 0 ,而後再執行 INCR 操做進行加一。 
而後其它用戶在執行 INCR 操做進行加一時,若是返回的數大於 1 ,說明這個鎖正在被使用當中。java

一、 客戶端A請求服務器獲取key的值爲1表示獲取了鎖
二、 客戶端B也去請求服務器獲取key的值爲2表示獲取鎖失敗
三、 客戶端A執行代碼完成,刪除鎖
四、 客戶端B在等待一段時間後在去請求的時候獲取key的值爲1表示獲取鎖成功
五、 客戶端B執行代碼完成,刪除鎖

$redis->incr($key);
$redis->expire($key, $ttl); //設置生成時間爲1秒

3. 第二種鎖SETNX

這種加鎖的思路是,若是 key 不存在,將 key 設置爲 value 
若是 key 已存在,則 SETNX 不作任何動做redis

一、 客戶端A請求服務器設置key的值,若是設置成功就表示加鎖成功
二、 客戶端B也去請求服務器設置key的值,若是返回失敗,那麼就表明加鎖失敗
三、 客戶端A執行代碼完成,刪除鎖
四、 客戶端B在等待一段時間後在去請求設置key的值,設置成功
五、 客戶端B執行代碼完成,刪除鎖

$redis->setNX($key, $value);
$redis->expire($key, $ttl);

4. 第三種鎖SET

上面兩種方法都有一個問題,會發現,都須要設置 key 過時。那麼爲何要設置key過時呢?若是請求執行由於某些緣由意外退出了,致使建立了鎖可是沒有刪除鎖,那麼這個鎖將一直存在,以致於之後緩存再也得不到更新。因而乎咱們須要給鎖加一個過時時間以防不測。 
可是藉助 Expire 來設置就不是原子性操做了。因此還能夠經過事務來確保原子性,可是仍是有些問題,因此官方就引用了另一個,使用 SET 命令自己已經從版本 2.6.12 開始包含了設置過時時間的功能。緩存

一、 客戶端A請求服務器設置key的值,若是設置成功就表示加鎖成功
二、 客戶端B也去請求服務器設置key的值,若是返回失敗,那麼就表明加鎖失敗
三、 客戶端A執行代碼完成,刪除鎖
四、 客戶端B在等待一段時間後在去請求設置key的值,設置成功
五、 客戶端B執行代碼完成,刪除鎖
$redis->set($key, $value, array('nx', 'ex' => $ttl));  //ex表示秒

5. 其它問題

雖然上面一步已經知足了咱們的需求,可是仍是要考慮其它問題? 
一、 redis發現鎖失敗了要怎麼辦?中斷請求仍是循環請求? 
二、 循環請求的話,若是有一個獲取了鎖,其它的在去獲取鎖的時候,是否是容易發生搶鎖的可能? 
三、 鎖提早過時後,客戶端A還沒執行完,而後客戶端B獲取到了鎖,這時候客戶端A執行完了,會不會在刪鎖的時候把B的鎖給刪掉?服務器

6. 解決辦法

針對問題1:使用循環請求,循環請求去獲取鎖 
針對問題2:針對第二個問題,在循環請求獲取鎖的時候,加入睡眠功能,等待幾毫秒在執行循環 
針對問題3:在加鎖的時候存入的key是隨機的。這樣的話,每次在刪除key的時候判斷下存入的key裏的value和本身存的是否同樣測試

setnx的Java簡單實現:spa

獲取鎖:.net

/**
	 * 獲取一次鎖
	 * @param redisClient
	 * @return
	 * 	<p>true : 獲取到鎖</p>
	 * 	<p>false :未獲取到鎖</p>
	 */
	public static boolean getLock(Jedis redisClient) {
		//是否獲取到鎖
	    boolean hasLock = false;
	    try {
	        hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
	        if (hasLock) {
	            redisClient.expire(lockKey, lockTime);//一小時
	        }
	    } catch (Exception e) {
	        redisClient.expire(lockKey, lockTime);//一小時
	    }
	    return hasLock;
	}

此實現會有上面的問題1,只是獲取一次,若是獲取失敗,則再也不獲取。線程

若是要繼續獲取,須要使用方本身實現循環獲取邏輯和超時邏輯。code

修改下:blog

/**
	 * 循環獲取鎖,直到過超時時間
	 * @param redisClient
	 * @param timeout	單位s
	 * @return
	 * 	<p>true : 獲取到鎖</p>
	 * 	<p>false :未獲取到鎖</p>
	 */
	public static boolean getLockTimeOut(Jedis redisClient,int timeout) {
		//當前的毫秒
		long start = System.currentTimeMillis();
		//超時時間轉換爲毫秒單位
		timeout = timeout * 1000;
		//是否獲取到鎖
		boolean hasLock = false;
		
		while(!hasLock){
			try {
				hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
				//獲取到鎖
				if (hasLock) {
					redisClient.expire(lockKey, lockTime);//一小時
				} else {
					//未獲取到鎖,判斷是否超過超時時間
					long now = System.currentTimeMillis();
					//當前時間超過起始時間的時間間隔,當間隔>=超時時間,則中止獲取鎖
					if(now - start >= timeout){
						System.out.println("--------獲取鎖超時,再也不獲取--------");
						break;
					}
					//睡眠,下降搶鎖頻率,緩解redis壓力
					Thread.sleep(500);
				}
			} catch (Exception e) {
				redisClient.expire(lockKey, lockTime);//一小時
			}
		}
		return hasLock;
	}

此種實現,經過Thread.sleep(500)來下降搶鎖頻率,用以處理問題2,同時,經過while來實現循環獲取鎖邏輯,直到超過超時時間。這樣一來,外部調用時就不用再考慮自實現循環和超時問題了。

針對問題3,還未實現,晚點實現再發吧

 

附一個簡單測試類:

package com.paic.elis.elis_smp_cms.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisCount2 {

	static JedisPoolConfig config = new JedisPoolConfig();
	static JedisPool newPool = null;
	static Jedis jedisCli = null;
	static int lockTime = 60 * 60;	//單位:S
	static final String lockKey = "lockKey";
	static {
		config.setMaxTotal(20);
		newPool = new JedisPool(config, "10.20.130.34", 4436,20000,"quul5trl");
		jedisCli = newPool.getResource();
		jedisCli.del("mqCount");
		jedisCli.del(lockKey);
	}
	
	public static void main(String[] args) {
//		test1();
//		test2();
		test3();
//		test1();
	}
	
	//多個線程去獲取鎖,僅有獲取到鎖的線程纔會執行,其餘線程被丟棄
	public static void test1(){
		for(int i = 0 ; i < 10; i++){
			new Thread(){
				public void run(){
					Jedis jedisC = newPool.getResource();
					boolean flag = false;
					flag = getLock(jedisC);
					//獲取到鎖
					if(flag){
						System.out.println("獲取到鎖,開始處理");
						//業務邏輯執行
						for(int i = 0 ; i < 100 ; i++){
							jedisC.incr("mqCount");
						}
						System.out.println(jedisC.get("mqCount"));
						releaseLock(jedisC);
						System.out.println("釋放鎖成功");
					}
				}
			}.start();
		}
	}
	

	//多個線程去獲取鎖,按照獲取到鎖的順序執行,一直等到全部線程執行完畢
	public static void test2(){
		for(int i = 0 ; i < 10; i++){
			new Thread(){
				public void run(){
					Jedis jedisC = newPool.getResource();
					boolean flag = false;
					while(!flag){
						flag = getLock(jedisC);
						//獲取到鎖
						if(flag){
							System.out.println("獲取到鎖,開始處理");
							//業務邏輯執行
							for(int i = 0 ; i < 100 ; i++){
								jedisC.incr("mqCount");
							}
							System.out.println(jedisC.get("mqCount"));
							releaseLock(jedisC);
							System.out.println("釋放鎖成功");
						}
					}
				}
			}.start();
		}
	}

	//多個線程去獲取鎖,按照獲取到鎖的順序執行,等到超時時間以後仍未執行的線程被丟棄
	public static void test3(){
		for(int i = 0 ; i < 10; i++){
			new Thread(){
				public void run(){
					Jedis jedisC = newPool.getResource();
					boolean flag = false;
					flag = getLockTimeOut(jedisC,2);
					//獲取到鎖
					if(flag){
						System.out.println("獲取到鎖,開始處理");
						//業務邏輯執行
						for(int i = 0 ; i < 100 ; i++){
							jedisC.incr("mqCount");
						}
						System.out.println(jedisC.get("mqCount"));
						releaseLock(jedisC);
						System.out.println("釋放鎖成功");
					}
				}
			}.start();
		}
	}

	/**
	 * 獲取到鎖則執行,未獲取則不執行(放棄本次執行)
	 * @param redisClient
	 * @return
	 */
	public static void doMethod(Jedis redisClient) {
		//未獲取到鎖--直接返回
		if(!getLock(redisClient)) {
			return;
		}
		//獲取到鎖,開始處理
		try{
			System.out.println("獲取到鎖,開始處理");
			//業務邏輯執行
			for(int i = 0 ; i < 100 ; i++){
				redisClient.incr("mqCount");
			}
			System.out.println(redisClient.get("mqCount"));
		} finally {
			// 只要獲取到鎖,則在業務邏輯結束以後,必須釋放鎖
			releaseLock(redisClient);
		}
	}
	
	/**
	 * 獲取到鎖則執行,未獲取則一直嘗試獲取,直到獲取到鎖爲止
	 * @param redisClient
	 * @return
	 */
	public static void doMethodContinue(Jedis redisClient) {
		
		boolean flag = false;
		while(!flag){
			flag = getLock(redisClient);
			//若是獲取到鎖,則繼續執行,不然循環獲取
			if(flag){
				try{
					System.out.println("獲取到鎖,開始處理");
				} finally {
					// 只要獲取到鎖,則在業務邏輯結束以後,必須釋放鎖
					releaseLock(redisClient);
				}
			}
			//間隔0.5s再次獲取
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
			}
		}
	}
	
	/**
	 * 獲取到鎖則執行,未獲取則一直嘗試獲取,直到到達超時時間
	 * @param redisClient
	 * @param timeout	單位S
	 */
	public static void doMethodContinueTimeout(Jedis redisClient,int timeout) {
		
		//當前的毫秒
		long start = System.currentTimeMillis();
		//超時時間轉換爲毫秒單位
		timeout = timeout * 1000;
		boolean flag = false;
		while(!flag){
			flag = getLock(redisClient);
			//若是獲取到鎖,則繼續執行,不然循環獲取
			if(flag){
				try{
					System.out.println("獲取到鎖,開始處理");
				} finally {
					// 只要獲取到鎖,則在業務邏輯結束以後,必須釋放鎖
					releaseLock(redisClient);
				}
			}
			long now = System.currentTimeMillis();
			//當前時間超過起始時間的時間間隔,當間隔>=超時時間,則中止獲取鎖
			if(now - start >= timeout){
				flag  = true;
			}
			//睡眠,下降搶鎖頻率,緩解redis壓力
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 獲取一次鎖
	 * @param redisClient
	 * @return
	 * 	<p>true : 獲取到鎖</p>
	 * 	<p>false :未獲取到鎖</p>
	 */
	public static boolean getLock(Jedis redisClient) {
		//是否獲取到鎖
	    boolean hasLock = false;
	    try {
	        hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
	        if (hasLock) {
	            redisClient.expire(lockKey, lockTime);//一小時
	        }
	    } catch (Exception e) {
	        redisClient.expire(lockKey, lockTime);//一小時
	    }
	    return hasLock;
	}
	
	/**
	 * 循環獲取鎖,直到過超時時間
	 * @param redisClient
	 * @param timeout	單位s
	 * @return
	 * 	<p>true : 獲取到鎖</p>
	 * 	<p>false :未獲取到鎖</p>
	 */
	public static boolean getLockTimeOut(Jedis redisClient,int timeout) {
		//當前的毫秒
		long start = System.currentTimeMillis();
		//超時時間轉換爲毫秒單位
		timeout = timeout * 1000;
		//是否獲取到鎖
		boolean hasLock = false;
		
		while(!hasLock){
			try {
				hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
				//獲取到鎖
				if (hasLock) {
					redisClient.expire(lockKey, lockTime);//一小時
				} else {
					//未獲取到鎖,判斷是否超過超時時間
					long now = System.currentTimeMillis();
					//當前時間超過起始時間的時間間隔,當間隔>=超時時間,則中止獲取鎖
					if(now - start >= timeout){
						System.out.println("--------獲取鎖超時,再也不獲取--------");
						break;
					}
					//睡眠,下降搶鎖頻率,緩解redis壓力
					Thread.sleep(500);
				}
			} catch (Exception e) {
				redisClient.expire(lockKey, lockTime);//一小時
			}
		}
		return hasLock;
	}

	/**
	 * 釋放鎖
	 * @param redisClient
	 */
	public static void releaseLock(Jedis redisClient) {
	    redisClient.del(lockKey);
	}

}

 

主要轉自:

http://blog.csdn.net/Dennis_ukagaka/article/details/78072274

相關文章
相關標籤/搜索