若是設值成功則證實上鎖成功,而後再調用del指令釋放。java
// 這裏的冒號:就是一個普通的字符,沒特別含義,它能夠是任意其它字符,不要誤解 > setnx lock:codehole true OK ... do something critical ... > del lock:codehole (integer) 1
可是有個問題,若是邏輯執行到中間出現異常了,可能會致使 del 指令沒有被調用,這樣就會陷入死鎖,鎖永遠得不到釋放。redis
> setnx lock:codehole true OK > expire lock:codehole 5 ... do something critical ... > del lock:codehole (integer) 1
若是在 setnx 和 expire 之間服務器進程忽然掛掉了,多是由於機器掉電或者是被人爲殺掉的,就會致使 expire 得不到執行,也會形成死鎖。json
> set lock:codehole true ex 5 nx OK ... do something critical ... > del lock:codehole
爲 set 指令的 value 參數設置爲一個隨機數,釋放鎖時先匹配隨機數是否一致,而後再刪除 key,這是爲了確保當前線程佔有的鎖不會被其它線程釋放,除非這個鎖是過時了被服務器自動釋放的。
可是匹配 value 和刪除 key 不是一個原子操做,Redis 也沒有提供相似於delifequals這樣的指令,這就須要使用 Lua 腳原本處理了,由於 Lua 腳本能夠保證連續多個指令的原子性執行。服務器
上鎖 tag = random.nextint() # 隨機數 if redis.set(key, tag, nx=True, ex=5): do_something() redis.delifequals(key, tag) # 假想的 delifequals 指令 # delifequals 解鎖 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
注意:
Redis 的消息隊列不是專業的消息隊列,它沒有很是多的高級特性,沒有 ack 保證,若是對消息的可靠性有着極致的追求,那麼它就不適合使用。數據結構
Redis 的 list(列表) 數據結構經常使用來做爲異步消息隊列使用,使用rpush/lpush操做入隊列,使用lpop 和 rpop來出隊列。app
> rpush notify-queue apple banana pear (integer) 3 > llen notify-queue (integer) 3 > lpop notify-queue "apple" > llen notify-queue (integer) 2 > lpop notify-queue "banana" > llen notify-queue (integer) 1 > lpop notify-queue "pear" > llen notify-queue (integer) 0 > lpop notify-queue (nil)
若是隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接着再 pop,又沒有數據。這就是浪費生命的空輪詢。
一般咱們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鍾就能夠了。可是有個小問題,那就是睡眠會致使消息的延遲增大。
咱們可使用 blpop/brpop,阻塞讀。
阻塞讀在隊列沒有數據的時候,會當即進入休眠狀態,一旦數據到來,則馬上醒過來。消息的延遲幾乎爲零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。dom
上面咱們講了分佈式鎖的問題,可是加鎖失敗沒有講。通常咱們有3種策略來處理加鎖失敗:異步
延時隊列能夠經過 Redis 的 zset(有序列表) 來實現。咱們將消息序列化成一個字符串做爲 zset 的value,這個消息的到期處理時間做爲score,而後用多個線程輪詢 zset 獲取到期的任務進行處理,多個線程是爲了保障可用性,萬一掛了一個線程還有其它線程能夠繼續處理。分佈式
import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import redis.clients.jedis.Jedis; public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } // fastjson 序列化對象中存在 generic 類型時,須要使用 TypeReference private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); // 分配惟一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 後再試 } public void loop() { while (!Thread.interrupted()) { // 只取一條 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); // 歇會繼續 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 搶到了 TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } }