在使用Redis的過程當中,咱們對Redis作的每個操做,下發的每個命令, 均可以認爲是事件的存在。所謂事件監聽,就是Redis Server會對客戶端下發命令進行一個監控, 一但有人對Redis Server作操做, Redis Server都能知道,並經過某種方式將監聽到的事件轉發到對應的訂閱者。 html
一個電商商家後臺,商家能夠設置多個商品的價格並指訂價格的生效時間。後臺服務須要生效時間到時對全部已經上架的商品進行價格修改。並在價格修改爲功後通知全部關注該商品的買家客戶。java
注意: 假設該平臺擁有1w商家,平均每一個商家設置了100個商品,也就是你要保證200w件商品價格修改的實時通知性。nginx
解決方案一: 每一個商品都有一份表去記錄全部的新價格和生效時間,由定時任務job去輪詢表中的數據,若是符合當前時間則取出並執行接下來的業務邏輯。web
解決方案二: 每一個商品都有一份表去記錄全部的新價格和生效時間,由多個分佈式job去輪詢表中的數據,爲了減輕job服務實例的壓力,設置每2秒執行一次(定時任務不建議設置每秒)。在這基礎上其實還有優化的空間,能夠在設置分佈式job分片處理邏輯。對於每個job實例,還能夠在其內部開啓異步線程並行處理。redis
從上述的描述中咱們能夠發現,用戶量仍是比較大,其實實時性要求比較高,因此若是咱們把數據落庫,而後每次定時的時候從數據庫裏面去取而後作邏輯的判斷,這樣確定是沒法達到實時性的要求的,因此有一種方案是採用redis來管理這批數據。可是也有兩個個問題spring
要解決這個功能就須要使用到redis的一個高級的功能:redis 鍵空間通知(供Keyspace Notifications功能)其容許客戶Publish / Subscribe ,以便以某種方式接收影響Redis數據集的事件。數據庫
一樣是電商平臺,商家能夠設置商品的預售時間, 當預售時間到達時,修改商品狀態,並上架商品。該需求和需求一相似,都是以時間或者秒做爲計算依據,每一個商品都是獨立的,它們的時間屬性都不會同樣,因此是沒有規律性的。編程
訂單超時30分鐘自動關閉。(無論多少訂單,都是固定的時間間隔30分鐘,有規律)
這個問題解決的方案就有多種了,咱們能夠經過MQ來進行,如今大多的MQ都帶有死信隊列的機制,咱們能夠經過這個機制來完成,其次也能夠經過quartz的輪詢方式的完成,選擇合適解決方案應對當前的需求便可。固然本次主要是解決第一個需求,因此只談如何使用redis來解決。ruby
因爲Keyspace Notifications是在Redis 2.8.0以後的版本才提供的功能,因此咱們的Redis版本須要再2.8.0之上,不然沒法使用Redis時間監聽,在筆者寫這篇文章之時,Redis的最新正式版本已經爲5.0了 服務器
修改Redis配置,開啓Keyspace Notifications的兩種方式
CONFIG set notify-keyspace-events AKEx
- 配置文件修改
修改配置文件redis.conf,notify-keyspace-events AKEx
,從新啓動Redis
參數說明
1)notify-keyspace-events選項的參數爲空字符串時,表示功能關閉,當參數不是空字符串時,表示功能開啓
2)notify-keyspace-events默功能是關閉的
3)若是要使用此功能,必須字符串包含 K 或者 E,不然收不到任何事件消息
4)若是參數爲「AKE」,意味着接收全部事件消息
notify-keyspace-events 的參數能夠是如下字符的任意組合, 它指定了服務器該發送哪些類型的通知:
字符 發送的通知 K 鍵空間通知,全部通知以 keyspace@ 爲前綴 E 鍵事件通知,全部通知以 keyevent@ 爲前綴 g DEL 、 EXPIRE 、 RENAME 等類型無關的通用命令的通知 字符串命令的通知 l 列表命令的通知 s 集合命令的通知 h 哈希命令的通知 z 有序集合命令的通知 x 過時事件:每當有過時鍵被刪除時發送 e 驅逐(evict)事件:每當有鍵由於 maxmemory 政策而被刪除時發送 A 參數 g lshzxe 的別名
同時監聽 set、get、del 、 expire 操做
注意:get 操做監聽不到消息,set
,del
,expire
若是操做成功能夠監聽到消息,若是操做失敗也監聽不到消息.
更多命令參考
# 以keyevent訂閱庫0上的set、get、del、expire多個事件
subscribe __keyevent@0__:set __keyevent@0__:get __keyevent@0__:del __keyevent@0__:expire
# 以keyspace訂閱庫0上關於key爲mykey的全部事件
subscribe __keyspace@0__:mykey
複製代碼
模式匹配則使用psubscribe
# 以keyspace訂閱庫0上關於key爲mykey:*的全部事件
psubscribe __keyspace@0__:mykey:*
# 以keyevent、keyspace訂閱全部庫上的全部事件
psubscribe __key*@*__:*
複製代碼
使用技術Spring Boot + RedisTemplate
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class RedisExpiredListener implements MessageListener {
public final static String LISTENER_PATTERN = "__key*@*__:*";
@Override
public void onMessage(Message message, byte[] bytes) {
// 建議使用: valueSerializer
String body = new String(message.getBody());
String channel = new String(message.getChannel());
System.out.println("onMessage >> " + String.format("channel: %s, body: %s, bytes: %s"
, channel, body, new String(bytes)));
if (body.startsWith("product:")) {
final String productId = body.replace("product:", "");
System.out.println("獲得產品id:" + productId);
}
}
}
複製代碼
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
public class RedisExpiredApplication implements CommandLineRunner {
@Autowired
private RedisTemplate redisTemplate;
public static void main(String[] args) {
SpringApplication.run(RedisExpiredApplication.class, args);
}
@Bean
@Primary
public RedisTemplate redisTemplate() {
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(stringSerializer);
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setHashValueSerializer(stringSerializer);
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 設置Redis的鏈接工廠
container.setConnectionFactory(redisConnection);
// 設置監聽使用的線程池
container.setTaskExecutor(executor);
// 設置監聽的Topic: PatternTopic/ChannelTopic
Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN);
// 設置監聽器
container.addMessageListener(new RedisExpiredListener(), topic);
return container;
}
@Bean
public Executor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("V-Thread");
// rejection-policy:當pool已經達到max size的時候,如何處理新任務
// CALLER_RUNS:不在新線程中執行任務,而是由調用者所在的線程來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public void run(String... strings) throws Exception {
redisTemplate.opsForValue().set("orderId:123", "過時了是取不到的", 5, TimeUnit.SECONDS);
System.out.println("初始化設置 key 過時時間 5s");
System.out.println("main 線程休眠10秒");
Thread.sleep(10 * 1000);
System.out.println("main 線程休眠結束:獲取key orderId結果爲:" + redisTemplate.opsForValue().get("orderId:123"));
}
複製代碼
spring.redis.database=0
spring.redis.host=192.168.104.102
spring.redis.port=6378
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
複製代碼
效果展現:
由於redis key 過時以後,其中的value是沒法獲取到的, 因此在設計key的時候就包含了業務主鍵id在其中,以此來解決value消失沒法處理業務邏輯的狀況。到這裏,就能夠根據具體到期時間執行具體邏輯了。
Redis過時命令設置
# Redis Expire 命令用於設置 key 的過時時間。key 過時後將再也不可用。
Expire KEY_NAME TIME_IN_SECONDS
# Redis Expireat 命令用於以 UNIX 時間戳(unix timestamp)格式設置 key 的過時時間。key 過時後將再也不可用。
Expireat KEY_NAME TIME_IN_UNIX_TIMESTAMP
# Redis PEXPIREAT 命令用於設置 key 的過時時間,已毫秒計。key 過時後將再也不可用。
PEXPIREAT KEY_NAME TIME_IN_MILLISECONDS_IN_UNIX_TIMESTAMP
複製代碼
由於 Redis 目前的訂閱與發佈功能採起的是 發送即忘(fire and forget) 策略, 因此若是你的程序須要可靠事件通知(reliable notification of events), 那麼目前的鍵空間通知可能並不適合你:當訂閱事件的客戶端(服務實例)斷線時, 它會丟失全部在斷線期間分發給它的事件。並不能確保消息送達。將來有計劃容許更可靠的事件傳遞,但可能這將在更基礎的層面上解決,或者爲Pub / Sub自己帶來可靠性,或者容許Lua腳本攔截Pub / Sub消息來執行諸如推送將事件列入清單。
文章每週持續更新,能夠微信搜索「 十分鐘學編程 」第一時間閱讀和催更,若是這個文章寫得還不錯,以爲有點東西的話 ~求點贊👍 求關注❤️ 求分享❤️
各位的支持和承認,就是我創做的最大動力,咱們下篇文章見!