Redis消息隊列

1.消息隊列有兩種模式:發佈者訂閱者模式,生產者消費者模式html

發佈者訂閱者模式:發佈者發送消息到隊列,每一個訂閱者都能收到同樣的信息。java

生產者消費者模式:生產者將消息放入隊列,多個消費者共同監聽,誰先搶到資源,誰就從隊列中取走消息去處理,注意,每一個消息最多隻能被一個消費者接收。git

2.Redis消息隊列使用場景github

可使用消息隊列來實現短信的服務化,任何須要發送短信息的模塊,均可以直接調用短信服務來完成短信的發送。好比用戶系統登陸註冊短信,訂單系統的下單成功的短信等。web

3.Spring MVC中實現Redis消息隊列(GitHub地址:Redis_MQ)redis

3.1 引入Redis相應的依賴包,jedis-2.9.0.jar    commons-pool2-2.5.0.jar    spring-data-redis-1.6.0.RELEASE.jar (安利個下jar包的網站:http://www.mvnjar.com/search.htmlspring

3.2 新建redis.properties文件apache

host=10.0.20.251
port=6379

3.3配置application-redis.xml文件(applicationContext-redis.xml)spring-mvc

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:redis="http://www.springframework.org/schema/redis"
    xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
                            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd     
                            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
                            http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop.xsd
                            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
                            http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
    <description>spring-data-redis配置</description>
    <context:component-scan base-package="com.fpc.Entity"></context:component-scan>
    <!-- 引入配置文件 -->  
    <bean id="propertyConfigurer"  
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
        <property name="location" value="classpath:redis.properties" />  
    </bean> 

    <bean id="redisConnectionFactory"
        class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="hostName" value="${host}"></property>
        <property name="port" value="${port}"></property>
        <property name="usePool" value="true"></property>
    </bean>

    <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
        <property name="connectionFactory" ref="redisConnectionFactory"></property>
    </bean>
    <!-- 序列化:通常咱們想Redis發送一個消息定義的Java對象,這個對象須要序列化。這裏使用JdkSerializationRedisSerializer -->
    <bean id="jdkSerializer"
        class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />

    <!-- 監聽器 -->
    <bean id="smsMessageListener"
        class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="smsMessageDelegateListener" />
        <property name="serializer" ref="jdkSerializer" />
    </bean>

    <!-- 發送者 -->
    <bean id="sendMessage" class="com.fpc.Entity.sendMessage">
        <property name="redisTemplate" ref="redisTemplate"/>
    </bean>

    <!-- redis:listener-container:定義消息監聽,method:監聽消息執行的方法,serializer:序列化,topic:監聽主題(能夠理解爲隊列的名稱) -->
    <redis:listener-container>
        <redis:listener ref="smsMessageListener" method="handleMessage"
            serializer="jdkSerializer" topic="sms_queue_web_online" />
    </redis:listener-container>

    <!-- jedis -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxIdle" value="300" /> <!-- 最大可以保持idel狀態的對象數  -->
        <property name="maxTotal" value="60000" /> <!-- 最大分配的對象數 -->
        <property name="testOnBorrow" value="true" /> <!-- 當調用borrow Object方法時,是否進行有效性檢查 -->
    </bean>

    <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
        <constructor-arg index="0" ref="jedisPoolConfig" />
        <constructor-arg index="1" value="${host}" type="java.lang.String"/>
        <constructor-arg index="2" value="${port}" type="int" />
    </bean>

</beans>

3.4新建一個消息實體(Message)服務器

package com.fpc.Entity;

import java.io.Serializable;
import java.util.Date;

public class Message implements Serializable {
    private Integer messageId;
    private String mobileNumber;//電話號碼
    private Byte type;//消息類型,1:登陸驗證碼,2:訂單信息
    
    private Date createDate;//消息建立的時間
    
    //消息處理時間
    private Date processTime;
    
    //消息狀態:1:未發送,2:發送成功,3:發送失敗
    private Byte status;
    
    private String content;//消息主體

    public Integer getMessageId() {
        return messageId;
    }

    public void setMessageId(Integer messageId) {
        this.messageId = messageId;
    }

    public String getMobileNumber() {
        return mobileNumber;
    }

    public void setMobileNumber(String mobileNumber) {
        this.mobileNumber = mobileNumber;
    }

    public Byte getType() {
        return type;
    }

    public void setType(Byte type) {
        this.type = type;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }

    public Date getProcessTime() {
        return processTime;
    }

    public void setProcessTime(Date processTime) {
        this.processTime = processTime;
    }

    public Byte getStatus() {
        return status;
    }

    public void setStatus(Byte status) {
        this.status = status;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

3.5定義一個發送消息的對象(sendMessage

package com.fpc.Entity;

import java.io.Serializable;

import org.springframework.data.redis.core.RedisTemplate;

//定義一個消息發送對象
public class sendMessage {
    private RedisTemplate<String, Object> redisTemplate;

    public RedisTemplate<String, Object> getRedisTemplate() {
        return redisTemplate;
    }

    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    
    public void sendMessages( String channel,Serializable message ){
        redisTemplate.convertAndSend(channel, message);
    }
}

3.6定義一個監聽器(smsMessageDelegateListener

package com.fpc.Entity;

import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.springframework.stereotype.Component;

import redis.clients.jedis.Jedis;

//監聽消息
@Component("smsMessageDelegateListener")
public class smsMessageDelegateListener {
    //監聽Redis消息
    public void handleMessage( Serializable message ){
        Message mess = (Message) message;
        
        //發送短信
        //手機號: mess.getMobileNumber()
        //短信內容:mess.getContent();
        //send,發送狀態sendStatus
        //若是發送不成功則直接return,離開該方法,或者繼續重試
        //若是發送成功則須要,異步改寫短信的狀態;
        Jedis jedis = new Jedis("10.0.20.251");
        jedis.set("message", mess.getContent());
        Executor executor = Executors.newSingleThreadExecutor();
        executor.execute(new Runnable() {
            
            @Override
            public void run() {
                // TODO 自動生成的方法存根
                //讀寫短信數據表,將短信的發送狀態改成已發送
            }
        });
        
    }
}

注:該監聽器中註釋已經寫得很明白了,你能夠在此處對數據進行持久化即寫庫操做,因爲本demo中沒有寫庫(建一張message表,而後寫dao,service,serviceImpl等,就跟正常操道別的表是同樣的,因此此處作數據持久化了)

而後在該監聽器能夠開啓一個異步線程,異步改寫短信的狀態。

3.7.在controller中寫一個get_message方法(get_message

@RequestMapping("/get_message")
    public ModelAndView get_message( @RequestParam("mobileNumber") String moblieNumber ){
        //取得電話號碼,構造消息對象,而後經過短信服務器生成驗證碼發送到該手機上
        Message message = new Message();
        message.setMobileNumber(moblieNumber);
        message.setType((byte)1); //驗證消息
        message.setContent("23456"); //消息的內容
        message.setStatus((byte)1);//未發送狀態
        message.setCreateDate(new Date());
        //1.能夠把待發送的消息存庫,也能夠不存庫,如今先不存庫
        
        //2.異步發送短信到redis隊列
        //2.1先構造一個消息發送對象
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-redis.xml");
        sendMessage sendm = (sendMessage)context.getBean("sendMessage");
        sendm.sendMessages("sms_queue_web_online", message);
        System.out.println(message.getContent());
        ModelAndView modelAndView = new ModelAndView();
        modelAndView.addObject("code",message.getContent());
        modelAndView.setViewName("user");
        return modelAndView;
}

注:此處沒有真正調用發短信的方法,而是硬編碼爲「23456」

3.8編寫jsp頁面,user.jsp

<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
    pageEncoding="UTF-8"%>
<%@ taglib prefix="shiro" uri="http://shiro.apache.org/tags" %>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>Insert title here</title>
</head>
<body>
    <shiro:user>
        welcome[<shiro:principal/>]login,<a href="Shiro/logout">logout</a>
    </shiro:user>
    <form action="Shiro/get_message" method="POST">
        phonenumber: <input type="text" name="mobileNumber"/>
        <br>
        code: <input type="text" name="password" value="${code}"/>
        <br>
        <input type="submit" value="get code">
    </form>
</body>
</html>

運行效果:

而後咱們去Redis服務器端看下,數據有沒有被保存在Redis中,而後藉此判斷監聽器有沒有執行:

能夠看到Redis服務器中已經有值存在了,說明消息隊列監聽器是確實在監聽的,你能夠根據實際的業務改變監聽器中的代碼和controller中的代碼。

相關文章
相關標籤/搜索