INCR
、SETNX
、SET
INCR
這種加鎖的思路是, key 不存在,那麼 key 的值會先被初始化爲 0 ,而後再執行 INCR 操做進行加一。
而後其它用戶在執行 INCR 操做進行加一時,若是返回的數大於 1 ,說明這個鎖正在被使用當中。java
一、 客戶端A請求服務器獲取key的值爲1表示獲取了鎖 二、 客戶端B也去請求服務器獲取key的值爲2表示獲取鎖失敗 三、 客戶端A執行代碼完成,刪除鎖 四、 客戶端B在等待一段時間後在去請求的時候獲取key的值爲1表示獲取鎖成功 五、 客戶端B執行代碼完成,刪除鎖 $redis->incr($key); $redis->expire($key, $ttl); //設置生成時間爲1秒
SETNX
這種加鎖的思路是,若是 key 不存在,將 key 設置爲 value
若是 key 已存在,則 SETNX
不作任何動做redis
一、 客戶端A請求服務器設置key的值,若是設置成功就表示加鎖成功 二、 客戶端B也去請求服務器設置key的值,若是返回失敗,那麼就表明加鎖失敗 三、 客戶端A執行代碼完成,刪除鎖 四、 客戶端B在等待一段時間後在去請求設置key的值,設置成功 五、 客戶端B執行代碼完成,刪除鎖 $redis->setNX($key, $value); $redis->expire($key, $ttl);
SET
上面兩種方法都有一個問題,會發現,都須要設置 key 過時。那麼爲何要設置key過時呢?若是請求執行由於某些緣由意外退出了,致使建立了鎖可是沒有刪除鎖,那麼這個鎖將一直存在,以致於之後緩存再也得不到更新。因而乎咱們須要給鎖加一個過時時間以防不測。
可是藉助 Expire 來設置就不是原子性操做了。因此還能夠經過事務來確保原子性,可是仍是有些問題,因此官方就引用了另一個,使用 SET
命令自己已經從版本 2.6.12 開始包含了設置過時時間的功能。緩存
一、 客戶端A請求服務器設置key的值,若是設置成功就表示加鎖成功 二、 客戶端B也去請求服務器設置key的值,若是返回失敗,那麼就表明加鎖失敗 三、 客戶端A執行代碼完成,刪除鎖 四、 客戶端B在等待一段時間後在去請求設置key的值,設置成功 五、 客戶端B執行代碼完成,刪除鎖 $redis->set($key, $value, array('nx', 'ex' => $ttl)); //ex表示秒
雖然上面一步已經知足了咱們的需求,可是仍是要考慮其它問題?
一、 redis發現鎖失敗了要怎麼辦?中斷請求仍是循環請求?
二、 循環請求的話,若是有一個獲取了鎖,其它的在去獲取鎖的時候,是否是容易發生搶鎖的可能?
三、 鎖提早過時後,客戶端A還沒執行完,而後客戶端B獲取到了鎖,這時候客戶端A執行完了,會不會在刪鎖的時候把B的鎖給刪掉?服務器
針對問題1:使用循環請求,循環請求去獲取鎖
針對問題2:針對第二個問題,在循環請求獲取鎖的時候,加入睡眠功能,等待幾毫秒在執行循環
針對問題3:在加鎖的時候存入的key是隨機的。這樣的話,每次在刪除key的時候判斷下存入的key裏的value和本身存的是否同樣測試
setnx的Java簡單實現:spa
獲取鎖:.net
/** * 獲取一次鎖 * @param redisClient * @return * <p>true : 獲取到鎖</p> * <p>false :未獲取到鎖</p> */ public static boolean getLock(Jedis redisClient) { //是否獲取到鎖 boolean hasLock = false; try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; if (hasLock) { redisClient.expire(lockKey, lockTime);//一小時 } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小時 } return hasLock; }
此實現會有上面的問題1,只是獲取一次,若是獲取失敗,則再也不獲取。線程
若是要繼續獲取,須要使用方本身實現循環獲取邏輯和超時邏輯。code
修改下:blog
/** * 循環獲取鎖,直到過超時時間 * @param redisClient * @param timeout 單位s * @return * <p>true : 獲取到鎖</p> * <p>false :未獲取到鎖</p> */ public static boolean getLockTimeOut(Jedis redisClient,int timeout) { //當前的毫秒 long start = System.currentTimeMillis(); //超時時間轉換爲毫秒單位 timeout = timeout * 1000; //是否獲取到鎖 boolean hasLock = false; while(!hasLock){ try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; //獲取到鎖 if (hasLock) { redisClient.expire(lockKey, lockTime);//一小時 } else { //未獲取到鎖,判斷是否超過超時時間 long now = System.currentTimeMillis(); //當前時間超過起始時間的時間間隔,當間隔>=超時時間,則中止獲取鎖 if(now - start >= timeout){ System.out.println("--------獲取鎖超時,再也不獲取--------"); break; } //睡眠,下降搶鎖頻率,緩解redis壓力 Thread.sleep(500); } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小時 } } return hasLock; }
此種實現,經過Thread.sleep(500)來下降搶鎖頻率,用以處理問題2,同時,經過while來實現循環獲取鎖邏輯,直到超過超時時間。這樣一來,外部調用時就不用再考慮自實現循環和超時問題了。
針對問題3,還未實現,晚點實現再發吧
附一個簡單測試類:
package com.paic.elis.elis_smp_cms.redis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisCount2 { static JedisPoolConfig config = new JedisPoolConfig(); static JedisPool newPool = null; static Jedis jedisCli = null; static int lockTime = 60 * 60; //單位:S static final String lockKey = "lockKey"; static { config.setMaxTotal(20); newPool = new JedisPool(config, "10.20.130.34", 4436,20000,"quul5trl"); jedisCli = newPool.getResource(); jedisCli.del("mqCount"); jedisCli.del(lockKey); } public static void main(String[] args) { // test1(); // test2(); test3(); // test1(); } //多個線程去獲取鎖,僅有獲取到鎖的線程纔會執行,其餘線程被丟棄 public static void test1(){ for(int i = 0 ; i < 10; i++){ new Thread(){ public void run(){ Jedis jedisC = newPool.getResource(); boolean flag = false; flag = getLock(jedisC); //獲取到鎖 if(flag){ System.out.println("獲取到鎖,開始處理"); //業務邏輯執行 for(int i = 0 ; i < 100 ; i++){ jedisC.incr("mqCount"); } System.out.println(jedisC.get("mqCount")); releaseLock(jedisC); System.out.println("釋放鎖成功"); } } }.start(); } } //多個線程去獲取鎖,按照獲取到鎖的順序執行,一直等到全部線程執行完畢 public static void test2(){ for(int i = 0 ; i < 10; i++){ new Thread(){ public void run(){ Jedis jedisC = newPool.getResource(); boolean flag = false; while(!flag){ flag = getLock(jedisC); //獲取到鎖 if(flag){ System.out.println("獲取到鎖,開始處理"); //業務邏輯執行 for(int i = 0 ; i < 100 ; i++){ jedisC.incr("mqCount"); } System.out.println(jedisC.get("mqCount")); releaseLock(jedisC); System.out.println("釋放鎖成功"); } } } }.start(); } } //多個線程去獲取鎖,按照獲取到鎖的順序執行,等到超時時間以後仍未執行的線程被丟棄 public static void test3(){ for(int i = 0 ; i < 10; i++){ new Thread(){ public void run(){ Jedis jedisC = newPool.getResource(); boolean flag = false; flag = getLockTimeOut(jedisC,2); //獲取到鎖 if(flag){ System.out.println("獲取到鎖,開始處理"); //業務邏輯執行 for(int i = 0 ; i < 100 ; i++){ jedisC.incr("mqCount"); } System.out.println(jedisC.get("mqCount")); releaseLock(jedisC); System.out.println("釋放鎖成功"); } } }.start(); } } /** * 獲取到鎖則執行,未獲取則不執行(放棄本次執行) * @param redisClient * @return */ public static void doMethod(Jedis redisClient) { //未獲取到鎖--直接返回 if(!getLock(redisClient)) { return; } //獲取到鎖,開始處理 try{ System.out.println("獲取到鎖,開始處理"); //業務邏輯執行 for(int i = 0 ; i < 100 ; i++){ redisClient.incr("mqCount"); } System.out.println(redisClient.get("mqCount")); } finally { // 只要獲取到鎖,則在業務邏輯結束以後,必須釋放鎖 releaseLock(redisClient); } } /** * 獲取到鎖則執行,未獲取則一直嘗試獲取,直到獲取到鎖爲止 * @param redisClient * @return */ public static void doMethodContinue(Jedis redisClient) { boolean flag = false; while(!flag){ flag = getLock(redisClient); //若是獲取到鎖,則繼續執行,不然循環獲取 if(flag){ try{ System.out.println("獲取到鎖,開始處理"); } finally { // 只要獲取到鎖,則在業務邏輯結束以後,必須釋放鎖 releaseLock(redisClient); } } //間隔0.5s再次獲取 try { Thread.sleep(500); } catch (InterruptedException e) { } } } /** * 獲取到鎖則執行,未獲取則一直嘗試獲取,直到到達超時時間 * @param redisClient * @param timeout 單位S */ public static void doMethodContinueTimeout(Jedis redisClient,int timeout) { //當前的毫秒 long start = System.currentTimeMillis(); //超時時間轉換爲毫秒單位 timeout = timeout * 1000; boolean flag = false; while(!flag){ flag = getLock(redisClient); //若是獲取到鎖,則繼續執行,不然循環獲取 if(flag){ try{ System.out.println("獲取到鎖,開始處理"); } finally { // 只要獲取到鎖,則在業務邏輯結束以後,必須釋放鎖 releaseLock(redisClient); } } long now = System.currentTimeMillis(); //當前時間超過起始時間的時間間隔,當間隔>=超時時間,則中止獲取鎖 if(now - start >= timeout){ flag = true; } //睡眠,下降搶鎖頻率,緩解redis壓力 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 獲取一次鎖 * @param redisClient * @return * <p>true : 獲取到鎖</p> * <p>false :未獲取到鎖</p> */ public static boolean getLock(Jedis redisClient) { //是否獲取到鎖 boolean hasLock = false; try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; if (hasLock) { redisClient.expire(lockKey, lockTime);//一小時 } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小時 } return hasLock; } /** * 循環獲取鎖,直到過超時時間 * @param redisClient * @param timeout 單位s * @return * <p>true : 獲取到鎖</p> * <p>false :未獲取到鎖</p> */ public static boolean getLockTimeOut(Jedis redisClient,int timeout) { //當前的毫秒 long start = System.currentTimeMillis(); //超時時間轉換爲毫秒單位 timeout = timeout * 1000; //是否獲取到鎖 boolean hasLock = false; while(!hasLock){ try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; //獲取到鎖 if (hasLock) { redisClient.expire(lockKey, lockTime);//一小時 } else { //未獲取到鎖,判斷是否超過超時時間 long now = System.currentTimeMillis(); //當前時間超過起始時間的時間間隔,當間隔>=超時時間,則中止獲取鎖 if(now - start >= timeout){ System.out.println("--------獲取鎖超時,再也不獲取--------"); break; } //睡眠,下降搶鎖頻率,緩解redis壓力 Thread.sleep(500); } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小時 } } return hasLock; } /** * 釋放鎖 * @param redisClient */ public static void releaseLock(Jedis redisClient) { redisClient.del(lockKey); } }
主要轉自:
http://blog.csdn.net/Dennis_ukagaka/article/details/78072274