redis 實現發佈訂閱的功能

redis 除了做爲緩存的功能外還能夠用做消息中間件的功能,這片博客主要是介紹一下 redis 整合spring 實現消息的發佈和訂閱功能;redis

1:redis依賴,依賴兩個包,redis 包, spring-redis 包用於整合redis,這裏就不介紹了,具體能夠參考上一篇博客 :redis 緩存 中的介紹;spring

2:redis和spring的整合:緩存

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxIdle" value="1" />
        <property name="maxTotal" value="5" />
        <property name="blockWhenExhausted" value="true" />
        <property name="maxWaitMillis" value="30000" />
        <property name="testOnBorrow" value="true" />
    </bean>


    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="hostName" value="localhost" />
        <property name="port" value="6379"/>
        <property name="poolConfig" ref="jedisPoolConfig" />
        <property name="usePool" value="true"/>
    </bean>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
        <property name="connectionFactory"   ref="jedisConnectionFactory" />
        <property name="keySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <property name="valueSerializer">
            <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
        </property>
        <property name="hashKeySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
        </property>
        <property name="hashValueSerializer">
            <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
        </property>
    </bean>

    <!-- redis 消息發佈訂閱 -->
    <bean id="myRedisListener" class="bz.beppe.redis.MyRedisListener" scope="prototype"/>
  <!-- 這裏配置線程池任務 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="4"/>
        <property name="maxPoolSize" value="4"/>
        <property name="queueCapacity" value="100000"/>
    </bean>
  <!-- 配置redis container 將監聽類注入到redis容器中,實現監聽容器中指定 主題 隊列的功能 -->
    <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"
          destroy-method="destroy">
        <property name="connectionFactory" ref="jedisConnectionFactory"/>
        <property name="taskExecutor">
            <ref bean="taskExecutor" />
        </property>
        <property name="messageListeners">
            <map>
                <entry key-ref="myRedisListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="push:myredis"/>
                    </bean>
                </entry>
            </map>
        </property>
    </bean>

3:監聽類  該類只要實現  MessageListener 接口便可,而且在重寫的方法中進行須要的業務處理:ide

public class MyRedisListener implements MessageListener{


    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
//        redisTemplate.convertAndSend("push:myredis","this is the redis subscribe!!");
        String channelStr = new String(channel);
        String bodyStr = new String(body);
        System.out.println("渠道爲:"+channelStr+"消息爲:"+bodyStr);   //這裏的業務處理只作簡單的打印輸出
    }
}

4:消息發佈類: this

在消息發佈類中須要使用到 redisTemplate 來進行消息的發佈,其中,消息發佈的方法爲  redisTemplate.convertAndSend(String channel,String mess);spa

須要指定消息發佈的 通道名稱,這裏的通道和監聽中配置的渠道名稱一致 ;mess 就是你須要發佈到該通道上的內容;prototype

5:消息的發佈:線程

    @Test
    public void redisSubTest() throws InterruptedException {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-redis.xml");
        RedisTemplate redisTemplate =  (RedisTemplate) ctx.getBean("redisTemplate");
        redisTemplate.convertAndSend("push:myredis","this is the redis subscribe!!");
        Thread.sleep(5000);
    }

到這裏爲止,一個簡單的基於redis的訂閱發佈就實現了,在項目中你能夠根據你的具體業務來實現功能code

相關文章
相關標籤/搜索