redis事件監聽的應用場景與案例實戰

什麼是事件監聽

在使用Redis的過程當中,咱們對Redis作的每個操做,下發的每個命令, 均可以認爲是事件的存在。所謂事件監聽,就是Redis Server會對客戶端下發命令進行一個監控, 一但有人對Redis Server作操做, Redis Server都能知道,並經過某種方式將監聽到的事件轉發到對應的訂閱者。 html

應用場景

需求一:

一個電商商家後臺,商家能夠設置多個商品的價格並指訂價格的生效時間。後臺服務須要生效時間到時對全部已經上架的商品進行價格修改。並在價格修改爲功後通知全部關注該商品的買家客戶。java

注意: 假設該平臺擁有1w商家,平均每一個商家設置了100個商品,也就是你要保證200w件商品價格修改的實時通知性。nginx

解決方案一: 每一個商品都有一份表去記錄全部的新價格和生效時間,由定時任務job去輪詢表中的數據,若是符合當前時間則取出並執行接下來的業務邏輯。web

解決方案二: 每一個商品都有一份表去記錄全部的新價格和生效時間,由多個分佈式job去輪詢表中的數據,爲了減輕job服務實例的壓力,設置每2秒執行一次(定時任務不建議設置每秒)。在這基礎上其實還有優化的空間,能夠在設置分佈式job分片處理邏輯。對於每個job實例,還能夠在其內部開啓異步線程並行處理。redis

從上述的描述中咱們能夠發現,用戶量仍是比較大,其實實時性要求比較高,因此若是咱們把數據落庫,而後每次定時的時候從數據庫裏面去取而後作邏輯的判斷,這樣確定是沒法達到實時性的要求的,因此有一種方案是採用redis來管理這批數據。可是也有兩個個問題spring

  1. 當這批數據過時的時候,要提醒用戶
  2. 從redis刪除後,要修改數據庫的狀態。

要解決這個功能就須要使用到redis的一個高級的功能:redis 鍵空間通知(供Keyspace Notifications功能)其容許客戶Publish / Subscribe ,以便以某種方式接收影響Redis數據集的事件。數據庫

需求二:

一樣是電商平臺,商家能夠設置商品的預售時間, 當預售時間到達時,修改商品狀態,並上架商品。該需求和需求一相似,都是以時間或者秒做爲計算依據,每一個商品都是獨立的,它們的時間屬性都不會同樣,因此是沒有規律性的。編程

需求三:

訂單超時30分鐘自動關閉。(無論多少訂單,都是固定的時間間隔30分鐘,有規律)
這個問題解決的方案就有多種了,咱們能夠經過MQ來進行,如今大多的MQ都帶有死信隊列的機制,咱們能夠經過這個機制來完成,其次也能夠經過quartz的輪詢方式的完成,選擇合適解決方案應對當前的需求便可。固然本次主要是解決第一個需求,因此只談如何使用redis來解決。ruby

需求四:

  • 監控key的操做(set、del、expire……)
  • 監聽key的過時,自動觸發事件

如何使用Keyspace Notifications

因爲Keyspace Notifications是在Redis 2.8.0以後的版本才提供的功能,因此咱們的Redis版本須要再2.8.0之上,不然沒法使用Redis時間監聽,在筆者寫這篇文章之時,Redis的最新正式版本已經爲5.0服務器

修改Redis配置,開啓Keyspace Notifications的兩種方式

  • 命令修改

CONFIG set notify-keyspace-events AKEx

  • 配置文件修改
    修改配置文件redis.conf, notify-keyspace-events AKEx,從新啓動Redis

參數說明

1)notify-keyspace-events選項的參數爲空字符串時,表示功能關閉,當參數不是空字符串時,表示功能開啓
2)notify-keyspace-events默功能是關閉的
3)若是要使用此功能,必須字符串包含 K 或者 E,不然收不到任何事件消息
4)若是參數爲「AKE」,意味着接收全部事件消息
notify-keyspace-events 的參數能夠是如下字符的任意組合, 它指定了服務器該發送哪些類型的通知:

字符 發送的通知
K 鍵空間通知,全部通知以 keyspace@ 爲前綴
E 鍵事件通知,全部通知以 keyevent@ 爲前綴
g DEL 、 EXPIRE 、 RENAME 等類型無關的通用命令的通知
字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 過時事件:每當有過時鍵被刪除時發送
e 驅逐(evict)事件:每當有鍵由於 maxmemory 政策而被刪除時發送
A 參數 glshzxe 的別名

實例演示

同時監聽 set、get、del 、 expire 操做

注意:get 操做監聽不到消息,set,del ,expire 若是操做成功能夠監聽到消息,若是操做失敗也監聽不到消息.

更多命令參考

# 以keyevent訂閱庫0上的set、get、del、expire多個事件
subscribe __keyevent@0__:set __keyevent@0__:get __keyevent@0__:del __keyevent@0__:expire
# 以keyspace訂閱庫0上關於key爲mykey的全部事件
subscribe __keyspace@0__:mykey
複製代碼

模式匹配則使用psubscribe

# 以keyspace訂閱庫0上關於key爲mykey:*的全部事件
psubscribe __keyspace@0__:mykey:*
# 以keyevent、keyspace訂閱全部庫上的全部事件
psubscribe __key*@*__:*
複製代碼

程序實戰

使用技術Spring Boot + RedisTemplate

  • Redis監聽類 RedisExpiredListener
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class RedisExpiredListener implements MessageListener {
    public final static String LISTENER_PATTERN = "__key*@*__:*";

    @Override
    public void onMessage(Message message, byte[] bytes) {
        // 建議使用: valueSerializer
        String body = new String(message.getBody());
        String channel = new String(message.getChannel());
        System.out.println("onMessage >> " + String.format("channel: %s, body: %s, bytes: %s"
                , channel, body, new String(bytes)));

        if (body.startsWith("product:")) {
            final String productId = body.replace("product:""");
            System.out.println("獲得產品id:" + productId);
        }
    }

}
複製代碼
  • 啓動類 RedisExpiredApplication
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class RedisExpiredApplication implements CommandLineRunner {
    @Autowired
    private RedisTemplate redisTemplate;

    public static void main(String[] args) {
        SpringApplication.run(RedisExpiredApplication.class, args);
    }

    @Bean
    @Primary
    public RedisTemplate redisTemplate() {
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);
        redisTemplate.setValueSerializer(stringSerializer);
        redisTemplate.setHashKeySerializer(stringSerializer);
        redisTemplate.setHashValueSerializer(stringSerializer);
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 設置Redis的鏈接工廠
        container.setConnectionFactory(redisConnection);
        // 設置監聽使用的線程池
        container.setTaskExecutor(executor);
        // 設置監聽的Topic: PatternTopic/ChannelTopic
        Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN);
        // 設置監聽器
        container.addMessageListener(new RedisExpiredListener(), topic);
        return container;
    }

    @Bean
    public Executor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("V-Thread");

        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是由調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public void run(String... strings) throws Exception {
        redisTemplate.opsForValue().set("orderId:123""過時了是取不到的"5, TimeUnit.SECONDS);
        System.out.println("初始化設置 key 過時時間 5s");
        System.out.println("main 線程休眠10秒");
        Thread.sleep(10 * 1000);
        System.out.println("main 線程休眠結束:獲取key orderId結果爲:" + redisTemplate.opsForValue().get("orderId:123"));
    }
複製代碼
  • 配置文件:application.properties
spring.redis.database=0
spring.redis.host=192.168.104.102
spring.redis.port=6378
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
複製代碼

效果展現:

由於redis key 過時以後,其中的value是沒法獲取到的, 因此在設計key的時候就包含了業務主鍵id在其中,以此來解決value消失沒法處理業務邏輯的狀況。到這裏,就能夠根據具體到期時間執行具體邏輯了。

Redis過時命令設置

# Redis Expire 命令用於設置 key 的過時時間。key 過時後將再也不可用。
Expire KEY_NAME TIME_IN_SECONDS
# Redis Expireat 命令用於以 UNIX 時間戳(unix timestamp)格式設置 key 的過時時間。key 過時後將再也不可用。
Expireat KEY_NAME TIME_IN_UNIX_TIMESTAMP
# Redis PEXPIREAT 命令用於設置 key 的過時時間,已毫秒計。key 過時後將再也不可用。
PEXPIREAT KEY_NAME TIME_IN_MILLISECONDS_IN_UNIX_TIMESTAMP
複製代碼

注意事項

由於 Redis 目前的訂閱與發佈功能採起的是 發送即忘(fire and forget) 策略, 因此若是你的程序須要可靠事件通知(reliable notification of events), 那麼目前的鍵空間通知可能並不適合你:當訂閱事件的客戶端(服務實例)斷線時, 它會丟失全部在斷線期間分發給它的事件。並不能確保消息送達。將來有計劃容許更可靠的事件傳遞,但可能這將在更基礎的層面上解決,或者爲Pub / Sub自己帶來可靠性,或者容許Lua腳本攔截Pub / Sub消息來執行諸如推送將事件列入清單。

點關注,不迷路

文章每週持續更新,能夠微信搜索「 十分鐘學編程 」第一時間閱讀和催更,若是這個文章寫得還不錯,以爲有點東西的話 ~求點贊👍 求關注❤️ 求分享❤️
各位的支持和承認,就是我創做的最大動力,咱們下篇文章見!

相關文章
相關標籤/搜索