通常來講,消息隊列有兩種場景,一種是發佈者訂閱者模式,一種是生產者消費者模式。利用redis這兩種場景的消息隊列都可以實現。redis
定義:
生產者消費者模式:生產者生產消息放到隊列裏,多個消費者同時監聽隊列,誰先搶到消息誰就會從隊列中取走消息;即對於每一個消息只能被最多一個消費者擁有。
發佈者訂閱者模式:發佈者生產消息放到隊列裏,多個監聽隊列的消費者都會收到同一份消息;即正常狀況下每一個消費者收到的消息應該都是同樣的。spring
下面就以Spring Data Redis實現簡單的消息「發佈/訂閱」服務。ide
spring-redis使用RedisMessageListenerContainer進行消息監聽,客戶程序須要本身實現MessageListener,並以指定的topic註冊到RedisMessageListenerContainer,這樣,在指定的topic上若是有消息,RedisMessageListenerContainer便會通知該MessageListener。this
下面是在spring配置文件中配置spring-redis:spa
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd "> <!-- 引入jedis配置文件 --> <context:property-placeholder location="classpath*:resource/redis.properties" /> <context:component-scan base-package="com.ljq.durian" /> <!-- jedis pool配置 --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="${redis.maxIdle}" /> <property name="maxTotal" value="${redis.maxActive}" /> <property name="maxWaitMillis" value="${redis.maxWait}" /> <property name="testOnBorrow" value="${redis.testOnBorrow}" /> <property name="testOnReturn" value="${redis.testOnReturn}" /> </bean> <!-- spring data redis --> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <property name="hostName" value="${redis.host}"/> <property name="port" value="${redis.port}"/> <property name="poolConfig" ref="jedisPoolConfig"></property> <property name="timeout" value="${redis.timeout}"></property> <property name="usePool" value="true"></property> </bean> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory"/> <property name="defaultSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> </property> </bean> <bean id="redisMessageListener" class="com.ljq.durian.common.listener.RedisMessageListener"> <property name="redisTemplate" ref="redisTemplate"/> </bean> <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"> <property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="messageListeners"> <map> <entry key-ref="redisMessageListener"> <list> <!-- 普通訂閱,訂閱具體的頻道 --> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="topic.channel" /> </bean> <!-- 模式訂閱,支持模式匹配訂閱,*爲模糊匹配符 --> <bean class="org.springframework.data.redis.listener.PatternTopic"> <constructor-arg value="topic.*" /> </bean> <!-- 匹配全部頻道 --> <bean class="org.springframework.data.redis.listener.PatternTopic"> <constructor-arg value="*" /> </bean> </list> </entry> </map> </property> </bean> </beans>
上面例子中,最後三個bean的配置是實現發佈/訂閱服務的關鍵,RedisMessageListener是本身寫的實現了org.springframework.data.redis.connection.MessageListener的業務類,並以「topic.channel」 這個topic註冊到RedisMessageListenerContainer。RedisMessageListenerContainer在消息到達後負責通知MessageListener。
下面是RedisMessageListener的代碼:code
package com.ljq.durian.common.listener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate<String, String> redisTemplate; @Override public void onMessage(Message message, byte[] pattern) { RedisSerializer<?> serializer = redisTemplate.getValueSerializer(); Object channel = serializer.deserialize(message.getChannel()); Object body = serializer.deserialize(message.getBody()); System.out.println("主題: " + channel); System.out.println("消息內容: " + String.valueOf(body)); } public RedisTemplate<String, String> getRedisTemplate() { return redisTemplate; } public void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } }
這樣,應用啓動時,消息的訂閱方(subscriber)就註冊好了。這時候只要使用一個簡單的程序,模擬publisher,向指定topic發佈消息,RedisMessageListener就能夠接收到消息,spring-redis的寫法是這樣:component
redisTemplate.convertAndSend("topic.channel", "hello world!");