新姿式!Redis中調用Lua腳本以實現原子性操做

背景:有一服務提供者Leader,有多個消息訂閱者Workers。Leader是一個排隊程序,維護了一個用戶隊列,當某個資源空閒下來並被分配至隊列中的用戶時,Leader會向訂閱者推送消息(消息帶有惟一標識ID),訂閱者在接收到消息後會進行特殊處理並再次推往前端。前端

問題:前端只須要接收到一條由Worker推送的消息便可,可是若是Workers不作消息重複推送判斷的話,會致使前端收到多條消息推送,從而影響正常業務邏輯。java


方案一(未經過)

在Worker接收到消息時,嘗試先從redis緩存中根據消息的ID獲取值,有如下兩種狀況:redis

  • 若是值不存在,則表示當前這條消息是第一次被推送,能夠執行繼續執行推送程序,固然,不要忘了將當前消息ID做爲鍵插入緩存中,並設置一個過時時間,標記這條消息已經被推送過了。shell

  • 若是值存在,則表示當前這條消息是被推送過的,跳過推送程序。緩存

代碼能夠這麼寫:bash

public void waitingForMsg() {
    // Message Received.
    String value = redisTemplate.opsForValue().get("msg_pushed_" + msgId);
    if (!StringUtils.hasText(value)) {
        // 當不能從緩存中讀取到數據時,表示消息是第一次被推送
        // 趕忙往緩存中插入一個標識,表示當前消息已經被推送過了
        redisTemplate.opsForValue().set("msg_pushed_" + msgId, "1");
        // 再設置一個過時時間,防止數據無限制保留
        redisTemplate.expire("msg_pushed_" + msgId, 20, TimeUnit.SECONDS);
        // 接下來就能夠執行推送操做啦
        this.pushMsgToFrontEnd();
    }
}
複製代碼

看起來彷佛是沒啥問題,可是咱們從redis的角度分析一下請求,看看是否是真的沒問題。ide

> get msg_pushed_1 # 此時Worker1嘗試獲取值
> get msg_pushed_1 # Worker2也沒閒着,執行了這句話,而且時間找得剛恰好,就在Worker1準備插入值以前
> set msg_pushed_1 "1" # Worker1以爲消息沒有被推送,插入了一個值
> set msg_pushed_1 "1" # Worker2也這麼以爲,作了一樣的一件事複製代碼

你看,仍是有可能會往前端推送屢次消息,因此這個方案不經過。ui

再仔細想想,出現這個問題的緣由是啥?———— 就是在執行get和set命令時,沒有保持原子性操做,致使其餘命令有機可趁,那是否是能夠把get和set命令當成一整個部分執行,不讓其餘命令插入執行呢?this

有不少方案能夠實現,例如給鍵加鎖或者添加事務可能能夠完成這個操做。可是咱們今天討論一下另一種方案,在Redis中執行Lua腳本。atom


方案二

咱們能夠看一下Redis官方文檔對Lua腳本原子性的解釋

Atomicity of scripts

Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.

大體意思是說:咱們Redis採用相同的Lua解釋器去運行全部命令,咱們能夠保證,腳本的執行是原子性的。做用就相似於加了MULTI/EXEC。


好,原子性有保證了,那麼咱們再看看編寫語法。

> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second1) "key1"2) "key2"3) "first"4) "second"複製代碼

由前至後的命令解釋(Arg 表示參數的意思 argument):

    eval: Redis執行Lua腳本的命令,後接腳本內容及各參數。這個命令是從2.6.0版本纔開始支持的。

    1st. Arg : Lua腳本,其中的KEYS[]和ARGV[]是傳入script的參數 。

    2nd. Arg: 後面跟着的KEY個數n,從第三個參數開始的總共n個參數會被做爲KEYS傳入script中,在script中能夠經過KEYS[1], KEYS[2]…格式讀取,下標從1開始 。

    Remain Arg: 剩餘的參數能夠在腳本中經過ARGV[1], ARGV[2]…格式讀取 ,下標從1開始 。

咱們執行腳本內容是return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}表示返回傳入的參數,因此咱們能夠看到參數被原封不動的返回了。


接着,咱們再來實戰一下,在Lua腳本中調用Redis方法吧。

咱們能夠在Lua腳本中經過如下兩個命令調用redis的命令程序

  • redis.call()

  • redis.pcall()

二者的做用是同樣的,可是程序出錯時的返回結果略有不一樣。

使用方法,命令和在Redis中執行如出一轍:

> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 foo bar
OK
> eval "return redis.call('get', KEYS[1])" 1 foo
"bar"複製代碼


是否是很簡單,說了這麼多,咱們趕忙來現學現賣,寫一個腳本應用在咱們的場景中吧。

> eval "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end" 1 msg_push_1 "1" 10複製代碼

腳本的意思和咱們以前在方案一中寫的程序邏輯同樣,先判斷緩存中是否存在鍵,若是不存在則存入鍵和其值,而且設置失效時間,最後返回0;若是存在則返回1。PS: 若是對if redis.call('get', KEYS[1]) == false這裏爲何獲得的結果要與false比較的話,能夠看最後的Tip。

  • 執行第一次:咱們發現返回值0,而且咱們看到緩存中插入了一條數據,鍵爲msg_push_1、值爲"1"

  • 在失效前,執行屢次:咱們發現返回值一直爲1。而且在執行第一次後的10秒,該鍵被自動刪除。


將以上邏輯遷入咱們java代碼後,就是下面這個樣子啦

public boolean isMessagePushed(String messageId) {
    Assert.hasText(messageId, "消息ID不能爲空");

    // 使用lua腳本檢測值是否存在
    String script = "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end";

    // 這裏使用Long類型,查看源碼可知腳本返回值類型只支持Long, Boolean, List, or deserialized value type.
    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
    redisScript.setScriptText(script);
    redisScript.setResultType(Long.class);

    // 設置key
    List<String> keyList = new ArrayList<>();
    // key爲消息ID
    keyList.add(messageId);

    // 每一個鍵的失效時間爲20秒
    Long result = redisTemplate.execute(redisScript, keyList, 1, 20);

    // 返回true: 已讀、false: 未讀
    return result != null && result != 0L;
}

public void waitingForMsg() {
    // Message Received.
    if (!this.isMessagePushed(msgId)) {
        // 返回false表示未讀,接下來就能夠執行推送操做啦
        this.pushMsgToFrontEnd();
    }
}複製代碼

Tip

這裏只是簡單的Redis中使用Lua腳本介紹,詳細的使用方法能夠參考官方文檔,並且還有其餘不少用法介紹。

對了,上面還有一個須要注意一下,就是關於Redis和Lua中變量的相互轉換,由於提及來囉哩囉嗦的,因此沒放在上文中,最後能夠簡單說一下。

Redis to Lua conversion table.

  • Redis integer reply -> Lua number

  • Redis bulk reply -> Lua string

  • Redis multi bulk reply -> Lua table (may have other Redis data types nested)

  • Redis status reply -> Lua table with a single ok field containing the status

  • Redis error reply -> Lua table with a single err field containing the error

  • Redis Nil bulk reply and Nil multi bulk reply -> Lua false boolean type // 這裏就是上面咱們在腳本中作是否爲空判斷的時候if redis.call('get', KEYS[1]) == false,採用與false比較的緣由。Redis的nil(相似null)會被轉換爲Lua的false

Lua to Redis conversion table.

  • Lua number -> Redis integer reply (the number is converted into an integer)

  • Lua string -> Redis bulk reply

  • Lua table (array) -> Redis multi bulk reply (truncated to the first nil inside the Lua array if any)

  • Lua table with a single ok field -> Redis status reply

  • Lua table with a single err field -> Redis error reply

  • Lua boolean false -> Redis Nil bulk reply.

注意點:

Lua的Number類型會被轉爲Redis的Integer類型,所以若是但願獲得小數時,須要由Lua返回String類型的數字。 

相關文章
相關標籤/搜索