spring boot RedisMQ——使用RedisTemplate實現生產者消費者模式

1.配置redisredis

在application.properties文件中加入redis的配置信息spring

#redis gmall.redis.host=172.16.19.259 gmall.redis.port=6379 gmall.redis.pass=Gworld2017 gmall.redis.database=7 gmall.redis.timeout=5000

配置spring-redis.xml文件app

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task" xmlns:redis="http://www.springframework.org/schema/redis" xsi:schemaLocation="             http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
            http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"
    default-lazy-init="false">
    <bean id="redisConnectionFactory"
        class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" primary="true">
        <property name="hostName" value="${gmall.redis.host}" />
        <property name="port" value="${gmall.redis.port}" />
        <property name="password" value="${gmall.redis.pass}" />
        <property name="timeout" value="${gmall.redis.timeout}" />
        <property name="database" value="${gmall.redis.database}" />
    </bean>
    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" primary="true">
        <property name="connectionFactory" ref="redisConnectionFactory" />
        <property name="exposeConnection" value="true" />
        <property name="keySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <property name="valueSerializer">
            <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
        </property>
        <property name="hashKeySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
        </property>
        <property name="hashValueSerializer">
            <bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
        </property>
    </bean>
</beans>

二、編寫RedisUtil類dom

public class RedisUtil { private static Logger logger = LoggerFactory.getLogger(RedisUtil.class); @SuppressWarnings("rawtypes") private static RedisTemplate getRedisTemplate() { return (RedisTemplate) SpringBeanFactoryUtils.getBean("redisTemplate"); } @SuppressWarnings("unchecked") public static Long addRedisSet(String redisKey, Object value) { Long result = getRedisTemplate().opsForSet().add(redisKey, value); if (logger.isDebugEnabled()) { logger.debug("result=" + result); } return result; } @SuppressWarnings("unchecked") public static void leftPush(String key, String value) { getRedisTemplate().opsForList().leftPush(key, value); //getRedisTemplate().opsForList().leftPop(key);
 } @SuppressWarnings("unchecked") public static String rightPop(String key,long timeout,TimeUnit unit) { Object obj = getRedisTemplate().opsForList().rightPop(key, timeout, unit); String str = (String) obj; return str; } @SuppressWarnings("unchecked") public static Object rightPopAndLeftPush(String sourceKey, String destinationKey) { Object value = getRedisTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey); return value; } }
SpringBeanFactoryUtils類

package com.gcard.queue.utils; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; public class SpringBeanFactoryUtils   implements ApplicationContextAware { private static ApplicationContext appCtx; /** * 此方法能夠把ApplicationContext對象inject到當前類中做爲一個靜態成員變量。 * @param applicationContext ApplicationContext 對象. * @throws BeansException * @author wangdf */    
     
    public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException { appCtx = applicationContext; } /** * 獲取ApplicationContext * @return * @author wangdf */  
    public static ApplicationContext getApplicationContext(){ return appCtx; } /** * 這是一個便利的方法,幫助咱們快速獲得一個BEAN * @param beanName bean的名字 * @return 返回一個bean對象 * @author wangdf */    
    public static Object getBean( String beanName ) { return appCtx.getBean( beanName ); } @SuppressWarnings("unchecked") public static Object getBean( Class requiredType ) { return appCtx.getBean(requiredType); } }

 

三、模擬生產者ide

public class TaskProducer implements Runnable { Logger logger = LoggerFactory.getLogger(getClass()); @Override public void run() { try {
            for(int i=0;i<5;i++){ RedisUtil.leftPush("task-queue", "value_" + i); logger.info("插入一個新的任務:" + "value_" + i); } } catch (Exception e) { logger.error(e.getMessage(), e); } } }

四、模擬消費者ui

public class TaskConsumer implements Runnable { Logger logger = LoggerFactory.getLogger(getClass()); @Override public void run() {
        while (true) { try { String taskid = (String) RedisUtil.rightPopAndLeftPush("task-queue", "tmp-queue");//取出消息放到臨時隊列 // Thread.sleep(1000); // RedisUtil.rightPop("tmp-queue");//非阻塞 // 阻塞式brpop,List中無數據時阻塞,參數0表示一直阻塞下去,直到List出現數據
                String str = RedisUtil.rightPop("tmp-queue", 0, TimeUnit.MINUTES);//阻塞,取出臨時隊列 logger.info("線程取數據:{}", str); logger.info(str + "處理成功,被清除"); } catch (Exception e) { logger.error(e.getMessage(), e); } } /* * if(random.nextInt(13) % 7 == 0){ * RedisUtil.rightPopAndLeftPush("tmp-queue","task-queue");//彈回任務隊列 * logger.info(taskid+"處理失敗,被彈回任務隊列"); }else{ * RedisUtil.rightPop("tmp-queue"); logger.info(taskid+"處理成功,被清除"); } */ } }

五、獨立消費者做爲一個項目(監聽器)spa

在application-context.xml文件配置bean,啓動項目後就會執行這個監聽器線程

<bean class="com.gcard.longcode.manager.impl.MessageQueueServiceImpl" init-method="messageListener"/>
相關文章
相關標籤/搜索