redis分佈式鎖&隊列應用

分佈式鎖

  1. setnx(set if not exists)

若是設值成功則證實上鎖成功,而後再調用del指令釋放。java

// 這裏的冒號:就是一個普通的字符,沒特別含義,它能夠是任意其它字符,不要誤解
> setnx lock:codehole true
OK
... do something critical ...
> del lock:codehole
(integer) 1

可是有個問題,若是邏輯執行到中間出現異常了,可能會致使 del 指令沒有被調用,這樣就會陷入死鎖,鎖永遠得不到釋放。redis

  1. setnx(set if not exists) 加上過時時間
> setnx lock:codehole true
OK
> expire lock:codehole 5
... do something critical ...
> del lock:codehole
(integer) 1

若是在 setnx 和 expire 之間服務器進程忽然掛掉了,多是由於機器掉電或者是被人爲殺掉的,就會致使 expire 得不到執行,也會形成死鎖。json

  1. 使用ex nx命令一塊兒執行
> set lock:codehole true ex 5 nx
OK
... do something critical ...
> del lock:codehole
  1. 刪除鎖的線程必須是上鎖的線程

爲 set 指令的 value 參數設置爲一個隨機數,釋放鎖時先匹配隨機數是否一致,而後再刪除 key,這是爲了確保當前線程佔有的鎖不會被其它線程釋放,除非這個鎖是過時了被服務器自動釋放的。
可是匹配 value 和刪除 key 不是一個原子操做,Redis 也沒有提供相似於delifequals這樣的指令,這就須要使用 Lua 腳原本處理了,由於 Lua 腳本能夠保證連續多個指令的原子性執行。服務器

上鎖
tag = random.nextint()  # 隨機數
if redis.set(key, tag, nx=True, ex=5):
    do_something()
    redis.delifequals(key, tag)  # 假想的 delifequals 指令
    

# delifequals 解鎖
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

延時隊列

消息隊列

注意:
Redis 的消息隊列不是專業的消息隊列,它沒有很是多的高級特性,沒有 ack 保證,若是對消息的可靠性有着極致的追求,那麼它就不適合使用。數據結構

Redis 的 list(列表) 數據結構經常使用來做爲異步消息隊列使用,使用rpush/lpush操做入隊列,使用lpop 和 rpop來出隊列。app

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)

阻塞隊列

若是隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接着再 pop,又沒有數據。這就是浪費生命的空輪詢。
一般咱們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鍾就能夠了。可是有個小問題,那就是睡眠會致使消息的延遲增大。
咱們可使用 blpop/brpop,阻塞讀。
阻塞讀在隊列沒有數據的時候,會當即進入休眠狀態,一旦數據到來,則馬上醒過來。消息的延遲幾乎爲零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。dom

鎖衝突處理

上面咱們講了分佈式鎖的問題,可是加鎖失敗沒有講。通常咱們有3種策略來處理加鎖失敗:異步

  1. 直接拋出異常,通知用戶稍後重試
    這種方式比較適合由用戶直接發起的請求,用戶看到錯誤對話框後,會先閱讀對話框的內容,再點擊重試,這樣就能夠起到人工延時的效果。
  2. sleep 一會再重試
    sleep 會阻塞當前的消息處理線程,會致使隊列的後續消息處理出現延遲。若是碰撞的比較頻繁或者隊列裏消息比較多,sleep 可能並不合適。
  3. 將請求轉移至延時隊列,過一會再試
    這種方式比較適合異步消息處理,將當前衝突的請求扔到另外一個隊列延後處理以避開衝突。

延時隊列

延時隊列能夠經過 Redis 的 zset(有序列表) 來實現。咱們將消息序列化成一個字符串做爲 zset 的value,這個消息的到期處理時間做爲score,而後用多個線程輪詢 zset 獲取到期的任務進行處理,多個線程是爲了保障可用性,萬一掛了一個線程還有其它線程能夠繼續處理。分佈式

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // fastjson 序列化對象中存在 generic 類型時,須要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // 分配惟一的 uuid
    task.msg = msg;
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 後再試
  }

  public void loop() {
    while (!Thread.interrupted()) {
      // 只取一條
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // 歇會繼續
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // 搶到了
        TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {

      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }

    };
    Thread consumer = new Thread() {

      public void run() {
        queue.loop();
      }

    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {
    }
  }
}
相關文章
相關標籤/搜索