背景:有一服務提供者Leader,有多個消息訂閱者Workers。Leader是一個排隊程序,維護了一個用戶隊列,當某個資源空閒下來並被分配至隊列中的用戶時,Leader會向訂閱者推送消息(消息帶有惟一標識ID),訂閱者在接收到消息後會進行特殊處理並再次推往前端。前端
問題:前端只須要接收到一條由Worker推送的消息便可,可是若是Workers不作消息重複推送判斷的話,會致使前端收到多條消息推送,從而影響正常業務邏輯。java
在Worker接收到消息時,嘗試先從redis緩存中根據消息的ID獲取值,有如下兩種狀況:redis
若是值不存在,則表示當前這條消息是第一次被推送,能夠執行繼續執行推送程序,固然,不要忘了將當前消息ID做爲鍵插入緩存中,並設置一個過時時間,標記這條消息已經被推送過了。shell
若是值存在,則表示當前這條消息是被推送過的,跳過推送程序。緩存
代碼能夠這麼寫:bash
public void waitingForMsg() {
// Message Received.
String value = redisTemplate.opsForValue().get("msg_pushed_" + msgId);
if (!StringUtils.hasText(value)) {
// 當不能從緩存中讀取到數據時,表示消息是第一次被推送
// 趕忙往緩存中插入一個標識,表示當前消息已經被推送過了
redisTemplate.opsForValue().set("msg_pushed_" + msgId, "1");
// 再設置一個過時時間,防止數據無限制保留
redisTemplate.expire("msg_pushed_" + msgId, 20, TimeUnit.SECONDS);
// 接下來就能夠執行推送操做啦
this.pushMsgToFrontEnd();
}
}
複製代碼
看起來彷佛是沒啥問題,可是咱們從redis的角度分析一下請求,看看是否是真的沒問題。ide
> get msg_pushed_1 # 此時Worker1嘗試獲取值
> get msg_pushed_1 # Worker2也沒閒着,執行了這句話,而且時間找得剛恰好,就在Worker1準備插入值以前
> set msg_pushed_1 "1" # Worker1以爲消息沒有被推送,插入了一個值
> set msg_pushed_1 "1" # Worker2也這麼以爲,作了一樣的一件事複製代碼
你看,仍是有可能會往前端推送屢次消息,因此這個方案不經過。ui
再仔細想想,出現這個問題的緣由是啥?———— 就是在執行get和set命令時,沒有保持原子性操做,致使其餘命令有機可趁,那是否是能夠把get和set命令當成一整個部分執行,不讓其餘命令插入執行呢?this
有不少方案能夠實現,例如給鍵加鎖或者添加事務可能能夠完成這個操做。可是咱們今天討論一下另一種方案,在Redis中執行Lua腳本。atom
咱們能夠看一下Redis官方文檔對Lua腳本原子性的解釋。
Atomicity of scripts
Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.
大體意思是說:咱們Redis採用相同的Lua解釋器去運行全部命令,咱們能夠保證,腳本的執行是原子性的。做用就相似於加了MULTI/EXEC。
好,原子性有保證了,那麼咱們再看看編寫語法。
> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second1) "key1"2) "key2"3) "first"4) "second"複製代碼
由前至後的命令解釋(Arg 表示參數的意思 argument):
eval: Redis執行Lua腳本的命令,後接腳本內容及各參數。這個命令是從2.6.0版本纔開始支持的。
1st. Arg : Lua腳本,其中的KEYS[]和ARGV[]是傳入script的參數 。
2nd. Arg: 後面跟着的KEY個數n,從第三個參數開始的總共n個參數會被做爲KEYS傳入script中,在script中能夠經過KEYS[1], KEYS[2]…格式讀取,下標從1開始 。
Remain Arg: 剩餘的參數能夠在腳本中經過ARGV[1], ARGV[2]…格式讀取 ,下標從1開始 。
咱們執行腳本內容是re
turn {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}
表示返回傳入的參數,因此咱們能夠看到參數被原封不動的返回了。
接着,咱們再來實戰一下,在Lua腳本中調用Redis方法吧。
咱們能夠在Lua腳本中經過如下兩個命令調用redis的命令程序
redis.call()
redis.pcall()
二者的做用是同樣的,可是程序出錯時的返回結果略有不一樣。
使用方法,命令和在Redis中執行如出一轍:
> eval "return redis.call('set', KEYS[1], ARGV[1])" 1 foo bar
OK
> eval "return redis.call('get', KEYS[1])" 1 foo
"bar"複製代碼
是否是很簡單,說了這麼多,咱們趕忙來現學現賣,寫一個腳本應用在咱們的場景中吧。
> eval "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end" 1 msg_push_1 "1" 10複製代碼
腳本的意思和咱們以前在方案一中寫的程序邏輯同樣,先判斷緩存中是否存在鍵,若是不存在則存入鍵和其值,而且設置失效時間,最後返回0;若是存在則返回1。PS: 若是對if redis.call('get', KEYS[1]) == false
這裏爲何獲得的結果要與false比較的話,能夠看最後的Tip。
執行第一次:咱們發現返回值0,而且咱們看到緩存中插入了一條數據,鍵爲msg_push_1
、值爲"1"
在失效前,執行屢次:咱們發現返回值一直爲1。而且在執行第一次後的10秒,該鍵被自動刪除。
將以上邏輯遷入咱們java代碼後,就是下面這個樣子啦
public boolean isMessagePushed(String messageId) {
Assert.hasText(messageId, "消息ID不能爲空");
// 使用lua腳本檢測值是否存在
String script = "if redis.call('get', KEYS[1]) == false then redis.call('set', KEYS[1], ARGV[1]) redis.call('expire', KEYS[1], ARGV[2]) return 0 else return 1 end";
// 這裏使用Long類型,查看源碼可知腳本返回值類型只支持Long, Boolean, List, or deserialized value type.
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
// 設置key
List<String> keyList = new ArrayList<>();
// key爲消息ID
keyList.add(messageId);
// 每一個鍵的失效時間爲20秒
Long result = redisTemplate.execute(redisScript, keyList, 1, 20);
// 返回true: 已讀、false: 未讀
return result != null && result != 0L;
}
public void waitingForMsg() {
// Message Received.
if (!this.isMessagePushed(msgId)) {
// 返回false表示未讀,接下來就能夠執行推送操做啦
this.pushMsgToFrontEnd();
}
}複製代碼
這裏只是簡單的Redis中使用Lua腳本介紹,詳細的使用方法能夠參考官方文檔,並且還有其餘不少用法介紹。
對了,上面還有一個坑須要注意一下,就是關於Redis和Lua中變量的相互轉換,由於提及來囉哩囉嗦的,因此沒放在上文中,最後能夠簡單說一下。
Redis to Lua conversion table.
Redis integer reply -> Lua number
Redis bulk reply -> Lua string
Redis multi bulk reply -> Lua table (may have other Redis data types nested)
Redis status reply -> Lua table with a single ok field containing the status
Redis error reply -> Lua table with a single err field containing the error
Redis Nil bulk reply and Nil multi bulk reply -> Lua false boolean type // 這裏就是上面咱們在腳本中作是否爲空判斷的時候
if redis.call('get', KEYS[1]) == false
,採用與false比較的緣由。Redis的nil(相似null)會被轉換爲Lua的falseLua to Redis conversion table.
Lua number -> Redis integer reply (the number is converted into an integer)
Lua string -> Redis bulk reply
Lua table (array) -> Redis multi bulk reply (truncated to the first nil inside the Lua array if any)
Lua table with a single ok field -> Redis status reply
Lua table with a single err field -> Redis error reply
Lua boolean false -> Redis Nil bulk reply.
注意點:
Lua的Number類型會被轉爲Redis的Integer類型,所以若是但願獲得小數時,須要由Lua返回String類型的數字。