使用Redis keyspace notifications來實現延遲任務(NodeJS版)

1.使用場景

在不少系統中,特別是電商系統經常存在須要執行延遲任務。例如一個待支付訂單,須要在30分鐘後自動關閉。雖然有不少方式能夠實現,好比說Job等,這裏主要介紹利用Redis的新特性 keyspace notifications來實現。html

2.基礎知識

重點!!!   Redis 2.8.0版本開始支持 keyspace notifications。若是你的Redis版本過低,能夠洗洗睡了……redis

若是你還不瞭解Redis的Pub/Sub,強烈建議你先閱讀該篇文章: Redis發佈與訂閱bash

接下來講說咱們的主角:keyspace notificationsui

keyspace notifications默認是關閉狀態,開啓則須要修改redis.conf文件或經過CONFIG SET來開啓或關閉該功能。這裏咱們使用CONFIG SET來開啓:spa

$ redis-cli config set notify-keyspace-events Ex複製代碼

這裏有人會問了, Ex 是什麼意思呢?這是notify-keyspace-events的參數,完整的參數列表看看下面的表格:code

字符 發送的通知
K 鍵空間通知,全部通知以 __keyspace@<db>__ 爲前綴
E 鍵事件通知,全部通知以 __keyevent@<db>__ 爲前綴
g DELEXPIRERENAME 等類型無關的通用命令的通知
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 過時事件:每當有過時鍵被刪除時發送
e 驅逐(evict)事件:每當有鍵由於 maxmemory 政策而被刪除時發送
A 參數 g$lshzxe 的別名

能夠看出,咱們只開啓了鍵事件通知和過時事件。由於咱們實現延時任務只須要這兩個就足夠了。話很少說,直接看代碼。cdn

3. 實現方案

一個延遲任務應該具有哪些屬性? 我以爲至少有如下屬性:htm

  • 任務類型。(例如:關閉訂單)
  • 任務ID。(例如:訂單ID)
  • 任務延遲時間。(例如:30分鐘)
  • 任務額外數據。(例如:訂單其餘相關數據)

肯定好後,咱們能夠繼續往下走。blog

3.1 註冊事件處理器

首先在工程啓動後,咱們須要根據不一樣的事件註冊不一樣的處理器:seo

const _ = require('lodash')
// 任務處理器map
const handlers = {}
// 事件類型map
const events = {}
const registerEventHandler = (type, handler) => {  
  if (!type) {    
      throw new Error('type不能爲空')  
  }  
  if (!_.isFunction(handler)) {    
      throw new Error('handler類型非function')  
  }  
  handlers[type] = handler  
  events[type] = true
}複製代碼

3.2 建立延遲任務

const redis = require('redis')
const client = redis.createClient()
const eventKeyPrefix = 'custom_event_'// 任務列表
const jobs = {}
const addDelayEvent = (type, id, body = {}, delay = 10 * 60) => {  
  const key = `${eventKeyPrefix}${type}_${id}`  
  const jobKey = `${type}_${id}`  
  client.setex(key, delay, 'delay event', (err) => {    
    if (err) {      
      return console.log('添加延遲事件失敗:', err);    
    }    
    console.log('添加延遲事件成功');    
    jobs[jobKey] = body  
  })
}
複製代碼

這裏比較關鍵的點就是client.setex(key, expired, value)這個方法,咱們須要給key添加一個過時時間,那麼當key過時後redis纔會發出一個過時事件。

3.3 訂閱過時事件

實現了前兩個步驟後,咱們已經能夠往redis裏寫入帶有過時時間的key了。接下來關鍵的就是訂閱過時事件並處理。

const redis = require('redis')
const sub = redis.createClient()

sub.on('pmessage', (pattern, channel, message) => {  
  // match key  
  const keyMatcher = new RegExp(`^${eventKeyPrefix}(${_.keys(events).join('|')})_(\\S+)$`)  
  const result = message.match(keyMatcher)  
  if (result) {    
    const type = result[1];    
    const id = result[2];    
    const handler = handlers[type]    
    console.log('訂閱消息:type=%s, id=%s', type, id);    
    if (_.isFunction(handler)) {      
      const jobKey = `${type}_${id}`      
      if (jobs[jobKey]) {        
        handler(id, jobs[jobKey])      
      } else {        
        console.log('未找到延遲事件,type=%s,id=%s', type, id);      
      }    
    } else {      
      console.log('未找到事件處理器。type=%s', type)    
    }  
  }
})
// 訂閱頻道
sub.psubscribe('__key*__:expired')
複製代碼

3.4 編寫Demo

最後咱們寫一個Demo來驗證下咱們的功能。

const eventManager = require('./utils/eventManager')

eventManager.registerEventHandler('closeorder', (id, body) => {  
    console.log('關閉訂單 id=%s, body=%o', id, body);
})

eventManager.addDelayEvent('closeorder', 1111, {name: 'test'}, 5)複製代碼


Done! 

相關文章
相關標籤/搜索