目錄java
Redis 經過 PUBLISH 、 SUBSCRIBE 等命令實現了發佈訂閱模式。該功能提供兩種信息機制, 分別是「發佈訂閱到頻道」和「發佈訂閱到模式」。git
PUBLISH channel message
複製代碼
Redis 的 PUBLISH
命令可讓客戶端把指定的消息發送到指定的頻道中。github
SUBSCRIBE channel [channel …]
複製代碼
Redis 的 SUBSCRIBE
命令可讓客戶端訂閱任意數量的頻道, 每當有新信息發送到被訂閱的頻道時,信息就會被髮送給全部訂閱指定頻道的客戶端。redis
下面咱們就演示一下 PUBLISH命令和SUBSCRIBE命令的用法:spring
首先是訂閱單個頻道: 設計模式
而後是訂閱多個頻道:springboot
Redis 的發佈與訂閱實現支持模式匹配(pattern matching)。bash
客戶端能夠訂閱一個帶 * 號的模式,若是某個/某些頻道的名字和這個模式匹配,那麼當有信息發送給這個/這些頻道的時候,客戶端也會收到這個/這些頻道的信息。服務器
客戶端訂閱的模式裏面能夠包含多個 glob 風格的通配符, 好比 * 、 ? 和 [...] 等。數據結構
好比執行命令:
PSUBSCRIBE t.*
複製代碼
客戶端將收到來自 t.java、 t.db 等頻道的信息。
每一個 Redis 服務器進程都維持着一個表示服務器狀態的 redis.h/redisServer
結構, 結構的 pubsub_channels
屬性是一個字典, 這個字典就用於保存訂閱頻道的信息:
struct redisServer {
// ...
dict *pubsub_channels;
// ...
}
複製代碼
其中,字典的鍵爲正在被訂閱的頻道, 而字典的值則是一個鏈表, 鏈表中保存了全部訂閱這個頻道的客戶端。
當調用 PUBLISH channel message
命令的時候,程序首先根據 channel 定位到字典的鍵,而後將信息發送給字典值鏈表中的全部客戶端。
Redis發佈/訂閱存儲結構以下圖所示:
下面帶你一步步經過 Spring Data Redis 來實現發佈與訂閱。
示例項目基於SpringBoot搭建,你能夠在這裏找到Spring Data Redis 實現發佈/訂閱模式的源碼。
因爲篇幅緣由下面就再也不演示項目搭建和集成Redis的過程了,實現細節請參考 springboot redis demo project 。
首先定義一個發佈者接口,接口只有一個void publish(String message)
方法,用於發佈消息。
public interface MessagePublisher {
/** * publish message * @param message */
void publish(String message);
}
複製代碼
而後提供一個基於Redis的MessagePublisher
實現。
其中最核心的是這個方法:redisTemplate.convertAndSend(topic.getTopic(), message)
,用於把消息發送到指定topic的channel之中。
import lombok.Setter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
/** * Redis message publisher * * @author ijiangtao * @create 2019-05-01 19:36 **/
@Setter
public class RedisMessagePublisher implements MessagePublisher {
private RedisTemplate<String, String> redisTemplate;
private ChannelTopic topic;
private RedisMessagePublisher() { }
public RedisMessagePublisher(RedisTemplate<String, String> redisTemplate, ChannelTopic topic) {
this.redisTemplate = redisTemplate;
this.topic = topic;
}
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
複製代碼
RedisMessageSubscriber
是一個訂閱者,它實現了MessageListener
接口,並經過一個messageList
來存/取監聽到的消息。
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.List;
/** * Redis Message Subscriber * <p> * RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface * * @author ijiangtao * @create 2019-05-01 19:39 **/
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Data
@Component
public class RedisMessageSubscriber implements MessageListener {
private List<String> messageList;
public void onMessage(Message message, byte[] pattern) {
messageList.add("[pattern:" + new String(pattern) + ",message:" + message.toString() + "]");
}
}
複製代碼
下面定義了兩個「topic」,而且經過兩個「publisher`將「message」發佈到「channel」指定的「topic」上。
而後咱們定義了兩個「subscriber」,「subscriber1」訂閱了「topic1」和「topic2」,「subscriber2」只訂閱了「topic2」。
最後咱們將這些發佈者和訂閱者都註冊到了 Spring Data Redis 提供的容器(RedisMessageListenerContainer
)中。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import java.util.ArrayList;
/** * config * * @author ijiangtao * @create 2019-05-01 19:57 **/
@Configuration
@ComponentScan("net.ijiangtao.tech.framework.spring.ispringboot.redis")
@EnableRedisRepositories(basePackages = "net.ijiangtao.tech.framework.spring.ispringboot")
@PropertySource("classpath:application.properties")
public class RedisPubSubConfig {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisTemplate.getConnectionFactory());
container.addMessageListener(messageListenerAdapter1() , topic1());
container.addMessageListener(messageListenerAdapter1() , topic2());
container.addMessageListener(messageListenerAdapter2(), topic2());
return container;
}
@Bean
MessageListenerAdapter messageListenerAdapter1() {
return new MessageListenerAdapter(messageListener1());
}
@Bean
public RedisMessageSubscriber messageListener1() {
return new RedisMessageSubscriber(new ArrayList<>());
}
@Bean
MessageListenerAdapter messageListenerAdapter2() {
return new MessageListenerAdapter(messageListener2());
}
@Bean
public RedisMessageSubscriber messageListener2() {
return new RedisMessageSubscriber(new ArrayList<>());
}
@Bean
MessagePublisher redisPublisherForTopic1() {
return new RedisMessagePublisher(redisTemplate, topic1());
}
@Bean
MessagePublisher redisPublisherForTopic2() {
return new RedisMessagePublisher(redisTemplate, topic2());
}
@Bean
ChannelTopic topic1() {
return new ChannelTopic("topic1");
}
@Bean
ChannelTopic topic2() {
return new ChannelTopic("topic2");
}
}
複製代碼
下面咱們經過單元測試,往「topic1」和「topic2」分別發佈了十條消息,而後遍歷「subscriber1」和「subscriber2」監聽到的消息內容。
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.UUID;
/** * Redis Pub/Sub tests * * @author ijiangtao * @create 2019-05-01 19:12 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RedisPubSub {
@Autowired
@Qualifier("redisPublisherForTopic1")
private MessagePublisher redisPublisher1;
@Autowired
@Qualifier("redisPublisherForTopic2")
private MessagePublisher redisPublisher2;
@Autowired
@Qualifier("messageListener1")
private RedisMessageSubscriber subscriber1;
@Autowired
@Qualifier("messageListener2")
private RedisMessageSubscriber subscriber2;
@Test
public void test1() {
// 循環發佈10次消息, 主要方法 redisTemplate.convertAndSend
for (int i = 0; i < 10; i++) {
String message = "Topic1 Message : " + UUID.randomUUID();
redisPublisher1.publish(message);
}
// 循環發佈10次消息, 主要方法 redisTemplate.convertAndSend
for (int i = 0; i < 10; i++) {
String message = "Topic2 Message : " + UUID.randomUUID();
redisPublisher2.publish(message);
}
// 獲取存儲的訂閱消息
List<String> messageList1 = subscriber1.getMessageList();
for (int i = 0; i < messageList1.size(); i++) {
log.info(messageList1.get(i));
}
// 獲取存儲的訂閱消息
List<String> messageList2 = subscriber2.getMessageList();
for (int i = 0; i < messageList2.size(); i++) {
log.info(messageList2.get(i));
}
}
}
複製代碼
「subscriber1」監聽到了「redisPublisher1」和「redisPublisher2」發佈的共20條消息:
[pattern:topic1,message:Topic1 Message : 2239af04-8e91-4adf-8e1e-98261a44ff77]
[pattern:topic1,message:Topic1 Message : 85107f06-2cae-4d6c-8123-9e8dc6e7a608]
[pattern:topic1,message:Topic1 Message : 0b80b9b8-8eee-476e-8462-bb6cbbbcf863]
[pattern:topic1,message:Topic1 Message : 0983f28d-d220-4538-b15e-dc66c0d3e491]
[pattern:topic1,message:Topic1 Message : 0f2d863c-00b9-4406-8e49-020c78a3632d]
[pattern:topic1,message:Topic1 Message : b8a0bb35-6cc2-4393-9136-2390de80f709]
[pattern:topic1,message:Topic1 Message : 027f1ca5-39cc-42c6-a4d8-87dc138260b1]
[pattern:topic1,message:Topic1 Message : ff85595e-2864-4dec-96c1-9dd29c69f670]
[pattern:topic1,message:Topic1 Message : 77471855-f04b-437d-bd1b-afb801a33cf9]
[pattern:topic1,message:Topic1 Message : feba4b0f-70c1-4c14-8ecb-bf4c6956f374]
[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
[pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
[pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
[pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
[pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
[pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
[pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
[pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
[pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
[pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
複製代碼
「subscriber2」監聽到了「redisPublisher2」發佈的共10條消息:
[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
[pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
[pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
[pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
[pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
[pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
[pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
[pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
[pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
[pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
複製代碼
以前介紹了 發佈/訂閱模式的基本概念和設計原理,本文從 Redis 發佈和訂閱相關的命令開始,逐步講解了 Redis 發佈訂閱的存儲結構,以及如何經過 Spring Data Redis 實現發佈訂閱模式。
本文是 精通Redis系列 和 精通設計模式系列 教程的一部分,歡迎關注個人公衆號,和做者一塊兒迭代成長。