rabbitmq的延遲消息隊列實現

第一部分:延遲消息的實現原理和知識點java

使用RabbitMQ來實現延遲任務必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,經過這二者的組合來實現上述需求。web

消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。若是隊列設置了,消息也設置了,那麼會取小的。因此一個消息若是被路由到不一樣的隊列中,這個消息死亡的時間有可能不同(不一樣的隊列設置)。這裏單講單個消息的TTL,由於它纔是實現延遲任務的關鍵。spring

能夠經過設置消息的expiration字段或者x-message-ttl屬性來設置時間,二者是同樣的效果。只是expiration字段是字符串參數,因此要寫個int類型的字符串:json

當上面的消息扔到隊列中後,過了3分鐘,若是沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有「死掉」的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去。單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange。app

Dead Letter Exchanges

Exchage的概念在這裏就不在贅述。一個消息在知足以下條件下,會進死信路由,記住這裏是路由而不是隊列,一個路由能夠對應不少隊列。dom

1. 一個消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。測試

2. 上面的消息的TTL到了,消息過時了。ui

3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。this

Dead Letter Exchange其實就是一種普通的exchange,和建立其餘exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過時了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。spa

實現延遲隊列

延遲任務經過消息的TTL和Dead Letter Exchange來實現。咱們須要創建2個隊列,一個用於發送消息,一個用於消息過時後的轉發目標隊列。

生產者輸出消息到Queue1,而且這個消息是設置有有效時間的,好比3分鐘。消息會在Queue1中等待3分鐘,若是沒有消費者收掉的話,它就是被轉發到Queue2,Queue2有消費者,收到,處理延遲任務。

完成延遲任務的實現。

 第二部分:具體實現例子

一、新創建消息隊列配置文件rabbitmq.properties

 1 #rabbitmq消息隊列的屬性配置文件properties
 2 rabbitmq.study.host=192.168.56.101
 3 rabbitmq.study.username=duanml
 4 rabbitmq.study.password=1qaz@WSX
 5 rabbitmq.study.port=5672
 6 rabbitmq.study.vhost=studymq
 7 
 8 #Mail 消息隊列的相關變量值
 9 mail.exchange=mailExchange
10 mail.exchange.key=mail_queue_key
11 
12 
13 #Phone 消息隊列的相關變量值
14 phone.topic.key=phone.one
15 phone.topic.key.more=phone.one.more
16 
17 #delay 延遲消息隊列的相關變量值
18 delay.directQueue.key=TradePayNotify_delay_2m
19 delay.directMessage.key=TradePayNotify_delay_3m

二、新創建配置文件,申明延遲隊列相關的配置信息如:spring-rabbigmq-dlx.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 5        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 6 
 7     <!--利用rabbitmq的TTL和延遲隊列,實現延遲通知任務的例子
 8         一、申明瞭一個訂單通知服務的隊列  queue_Notify
 9         二、申明瞭一個延遲隊列Notify_delay_15s,給整個隊列設置消息過時時間 爲15秒  ——————》 queue ttl  例子
10         三、申明瞭一個延遲隊列Notify_delay_30s  給發送到這個隊列的消息,消息自己設置過時時間 ————————》  message ttl  例子
11         四、當消息發送到二、3隊列的時候,達到了過時時間,即轉發到訂單通知服務工做隊列 一、
12         五、給隊列1 配置消費者服務工做監聽,便可完成延遲任務的結果。
13     -->
14 
15     <!-- ################ 訂單通知服務消費者配置 ################ -->
16     <!--隊列聲明-->
17     <rabbit:queue id="queue_Notify" name="queue_Notify" durable="true" auto-delete="false" exclusive="false"/>
18 
19     <!-- 訂單通知服務消費者 exchange -->
20     <rabbit:direct-exchange name="trade_direct" durable="true" auto-delete="false">
21         <rabbit:bindings>
22             <rabbit:binding queue="queue_Notify" key="TradePayNotify"/>
23         </rabbit:bindings>
24     </rabbit:direct-exchange>
25 
26     <!-- 訂單通知監聽處理器 -->
27     <bean id="notifyConsumerListener" class="org.seckill.rabbitmqListener.notify.NotifyConsumerListener"/>
28     <!--訂單消息隊列確認回調-->
29     <bean id="notifyConfirmCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyConfirmCallBackListener"></bean>
30     <!--訂單消息隊列消息發送失敗回調-->
31     <bean id="notifyFailedCallBackListener" class="org.seckill.rabbitmqListener.notify.NotifyFailedCallBackListener"></bean>
32 
33     <!-- 監聽器acknowledge=manual表示手工確認消息已處理(異常時能夠不確認消息),auto表示自動確認(只要不拋出異常,消息就會被消費) -->
34     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
35         <rabbit:listener queues="queue_Notify" ref="notifyConsumerListener"/>
36     </rabbit:listener-container>
37 
38     <!--*****************************************分割線*********************************************************-->
39 
40     <!-- ################ 延遲隊列生產者配置 ################ -->
41     <rabbit:template id="rabbitTemplateDelay" mandatory="true" exchange="trade_direct_delay"
42                      connection-factory="connectionFactory"
43                      confirm-callback="notifyConfirmCallBackListener"
44                      return-callback="notifyFailedCallBackListener"
45                      message-converter="jsonMessageConverter"/>
46 
47     <!--配置生產消息的延遲隊列操做主體類-->
48     <bean id="delayMQProducerImpl" class="org.seckill.utils.rabbitmq.Impl.MQProducerImpl">
49         <property name="rabbitTemplate" ref="rabbitTemplateDelay"></property>
50     </bean>
51 
52     <!--申明一個延遲隊列,給整個隊列的消息設置消息過時時間 x-message-ttl 2分鐘
53         當消息達到過時時間的時候,rabbitmq將會把消息從新定位轉發到其它的隊列中去,本例子轉發到
54         exchange:trade_direct
55         routing-key:TradePayNotify
56         知足如上兩點的隊列中去即爲:queue_Notify
57     -->
58     <rabbit:queue id="Notify_delay_2m" name="Notify_delay_2m" durable="true" auto-delete="false"
59                   exclusive="false">
60         <rabbit:queue-arguments>
61             <entry key="x-message-ttl" value="120000" value-type="java.lang.Long"/>
62             <entry key="x-dead-letter-exchange" value="trade_direct"/>
63             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
64         </rabbit:queue-arguments>
65     </rabbit:queue>
66 
67     <!--申明一個延遲隊列,在發送消息的時候給消息設置過時時間 3分鐘
68            當消息達到過時時間的時候,rabbitmq將會把消息從新定位轉發到其它的隊列中去,本例子轉發到
69            exchange:trade_direct
70            routing-key:TradePayNotify
71            知足如上兩點的隊列中去即爲:queue_Notify
72     -->
73     <rabbit:queue id="Notify_delay_3m" name="Notify_delay_3m" durable="true" auto-delete="false"
74                   exclusive="false">
75         <rabbit:queue-arguments>
76             <entry key="x-dead-letter-exchange" value="trade_direct"/>
77             <entry key="x-dead-letter-routing-key" value="TradePayNotify"/>
78         </rabbit:queue-arguments>
79     </rabbit:queue>
80 
81     <!-- 延遲隊列工做的 exchange -->
82     <rabbit:direct-exchange name="trade_direct_delay" durable="true" auto-delete="false">
83         <rabbit:bindings>
84             <rabbit:binding queue="Notify_delay_2m" key="TradePayNotify_delay_2m"/>
85             <rabbit:binding queue="Notify_delay_3m" key="TradePayNotify_delay_3m"/>
86         </rabbit:bindings>
87     </rabbit:direct-exchange>
88 
89 </beans>

三、新創建延遲隊列測試Controller

  1 package org.seckill.web;
  2 
  3 import org.seckill.dto.SeckillResult;
  4 import org.seckill.entity.Seckill;
  5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl;
  6 import org.seckill.utils.rabbitmq.MQProducer;
  7 import org.slf4j.Logger;
  8 import org.slf4j.LoggerFactory;
  9 import org.springframework.amqp.core.Message;
 10 import org.springframework.beans.factory.annotation.Autowired;
 11 import org.springframework.beans.factory.annotation.Value;
 12 import org.springframework.stereotype.Controller;
 13 import org.springframework.web.bind.annotation.RequestMapping;
 14 import org.springframework.web.bind.annotation.ResponseBody;
 15 
 16 import java.util.Date;
 17 
 18 /**
 19  * <p>Title: org.seckill.web</p>
 20  * <p>Company:東軟集團(neusoft)</p>
 21  * <p>Copyright:Copyright(c)2018</p>
 22  * User: 段美林
 23  * Date: 2018/5/30 17:33
 24  * Description: 消息隊列測試
 25  */
 26 @Controller
 27 @RequestMapping("/rabbitmq")
 28 public class RabbitmqController {
 29 
 30     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 31  40 
 41     @Value("${delay.directQueue.key}")
 42     private String delay_directQueue_key;
 43 
 44     @Value("${delay.directMessage.key}")
 45     private String delay_directMessage_key;
 46  52 
 53     @Autowired
 54     private MQProducerImpl delayMQProducerImpl;111 
112     /**
113      * @Description: 消息隊列
114      * @Author:
115      * @CreateTime:
116      */
117     @ResponseBody
118     @RequestMapping("/sendDelayQueue")
119     public SeckillResult<Long> testDelayQueue() {
120         SeckillResult<Long> result = null;
121         Date now = new Date();
122         try {
123             Seckill seckill = new Seckill();
124        //第一種狀況,給隊列設置消息ttl,詳情見配置文件
125             for (int i = 0; i < 2; i++) {
126                 seckill.setSeckillId(1922339387 + i);
127                 seckill.setName("delay_queue_ttl_" + i);
128                 String msgId = delayMQProducerImpl.getMsgId();
129                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
130                 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);
131             }
132         //第二種狀況,給消息設置ttl
133             for (int i = 0; i < 2; i++) {
134                 seckill.setSeckillId(1922339287 + i);
135                 seckill.setName("delay_message_ttl_" + i);
136                 String msgId = delayMQProducerImpl.getMsgId();
137                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
138                 if (message != null) {
139                     //給消息設置過時時間ttl,爲3分鐘
140                     message.getMessageProperties().setExpiration("180000");
141                     delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);
142                 }
143             }
144             result = new SeckillResult<Long>(true, now.getTime());
145         } catch (Exception e) {
146             logger.error(e.getMessage(), e);
147         }
148         return result;
149     }
150 
151 }

四、編寫延遲消息確認類和監聽類:

NotifyConfirmCallBackListener.java
 1 package org.seckill.rabbitmqListener.notify;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
 6 import org.springframework.amqp.rabbit.support.CorrelationData;
 7 
 8 /**
 9  * <p>Title: org.seckill.rabbitmqListener.notify</p>
10  * <p>Company:東軟集團(neusoft)</p>
11  * <p>Copyright:Copyright(c)2018</p>
12  * User: 段美林
13  * Date: 2018/6/3 0:27
14  * Description: 延遲任務測試--->消息確認回調類
15  */
16 public class NotifyConfirmCallBackListener implements ConfirmCallback {
17 
18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
19 
20     /**
21      * Confirmation callback.
22      *
23      * @param correlationData correlation data for the callback.
24      * @param ack             true for ack, false for nack
25      * @param cause           An optional cause, for nack, when available, otherwise null.
26      */
27     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
28         logger.info("延遲測試---確認消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause);
29     }
30 }
NotifyConsumerListener.java
 1 package org.seckill.rabbitmqListener.notify;
 2 
 3 import com.alibaba.fastjson.JSONObject;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.core.Message;
 8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 9 
10 /**
11  * <p>Title: org.seckill.rabbitmqListener.notify</p>
12  * <p>Company:東軟集團(neusoft)</p>
13  * <p>Copyright:Copyright(c)2018</p>
14  * User: 段美林
15  * Date: 2018/6/3 0:27
16  * Description: 訂單通知隊列監聽服務
17  * 實現延遲任務的功能
18  */
19 public class NotifyConsumerListener implements ChannelAwareMessageListener {
20 
21 
22     private final Logger logger = LoggerFactory.getLogger(this.getClass());
23 
24     /**
25      * Callback for processing a received Rabbit message.
26      * <p>Implementors are supposed to process the given Message,
27      * typically sending reply messages through the given Session.
28      *
29      * @param message the received AMQP message (never <code>null</code>)
30      * @param channel the underlying Rabbit Channel (never <code>null</code>)
31      * @throws Exception Any.
32      */
33     public void onMessage(Message message, Channel channel) throws Exception {
34         try {
35             //將字節流對象轉換成Java對象
36 //            Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
37 
38             String returnStr = new String(message.getBody(),"UTF-8");
39             JSONObject jsStr = JSONObject.parseObject(returnStr);
40 
41             logger.info("延遲測試--消費開始:名稱爲--===>" + jsStr.getString("name") + "----->返回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties());
42 
43             //TODO 進行相關業務操做
44 
45             //成功處理業務,那麼返回消息確認機制,這個消息成功處理OK
46             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
47 
48         } catch (Exception e) {
49             if (message.getMessageProperties().getRedelivered()) {
50                 //消息已經進行過一次輪詢操做,仍是失敗,將拒絕再次接收本消息
51                 logger.info("消息已重複處理失敗,拒絕再次接收...");
52                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
53 
54                 //TODO 進行相關業務操做
55 
56             } else {
57                 //消息第一次接收處理失敗後,將再此回到隊列中進行  再一次輪詢操做
58                 logger.info("消息即將再次返回隊列處理...");
59                 //處理失敗,那麼返回消息確認機制,這個消息沒有成功處理,返回到隊列中
60                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
61             }
62         }
63     }
64 }
NotifyFailedCallBackListener.java
 1 package org.seckill.rabbitmqListener.notify;
 2 
 3 import org.slf4j.Logger;
 4 import org.slf4j.LoggerFactory;
 5 import org.springframework.amqp.core.Message;
 6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
 7 
 8 /**
 9  * <p>Title: org.seckill.rabbitmqListener.notify</p>
10  * <p>Company:東軟集團(neusoft)</p>
11  * <p>Copyright:Copyright(c)2018</p>
12  * User: 段美林
13  * Date: 2018/6/3 0:28
14  * Description: 延遲任務測試----> 消息發送失敗回調類
15  */
16 public class NotifyFailedCallBackListener implements ReturnCallback {
17 
18     private final Logger logger = LoggerFactory.getLogger(this.getClass());
19 
20     /**
21      * Returned message callback.
22      *
23      * @param message    the returned message.
24      * @param replyCode  the reply code.
25      * @param replyText  the reply text.
26      * @param exchange   the exchange.
27      * @param routingKey the routing key.
28      */
29     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
30         logger.info("延遲測試------------->return--message:" +
31                 new String(message.getBody()) +
32                 ",replyCode:" + replyCode + ",replyText:" + replyText +
33                 ",exchange:" + exchange + ",routingKey:" + routingKey);
34     }
35 }

五、編寫消息隊列的操做類和接口:

MQProducer.java
  1 package org.seckill.utils.rabbitmq;
  2 
  3 import org.springframework.amqp.core.Message;
  4 import org.springframework.amqp.core.MessagePostProcessor;
  5 import org.springframework.amqp.rabbit.support.CorrelationData;
  6 
  7 /**
  8  * <p>Title: org.seckill.utils.rabbitmq</p>
  9  * <p>Company:東軟集團(neusoft)</p>
 10  * <p>Copyright:Copyright(c)2018</p>
 11  * User: 段美林
 12  * Date: 2018/5/30 11:49
 13  * Description: No Description
 14  */
 15 public interface MQProducer {
 16 
 17     /**
 18      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 19      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 20      * @param message
 21      */
 22     void sendDataToRabbitMQ(java.lang.Object message);
 23 
 24     /**
 25      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 26      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 27      * @param message
 28      * @param messagePostProcessor
 29      */
 30     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor);
 31 
 32     /**
 33      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 34      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 35      * @param message
 36      * @param messagePostProcessor
 37      * @param correlationData
 38      */
 39     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 40 
 41     /**
 42      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 43      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 44      * @param routingKey
 45      * @param message
 46      */
 47     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message);
 48 
 49     /**
 50      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 51      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 52      * @param routingKey
 53      * @param message
 54      * @param correlationData
 55      */
 56     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
 57 
 58     /**
 59      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 60      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 61      * @param routingKey
 62      * @param message
 63      * @param messagePostProcessor
 64      */
 65     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
 66 
 67     /**
 68      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
 69      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 70      * @param routingKey
 71      * @param message
 72      * @param messagePostProcessor
 73      * @param correlationData
 74      */
 75     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
 76 
 77     /**
 78      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
 79      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 80      * @param exchange
 81      * @param routingKey
 82      * @param message
 83      */
 84     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message);
 85 
 86     /**
 87      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
 88      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 89      * @param exchange
 90      * @param routingKey
 91      * @param message
 92      * @param correlationData
 93      */
 94     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
 95 
 96     /**
 97      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
 98      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 99      * @param exchange
100      * @param routingKey
101      * @param message
102      * @param messagePostProcessor
103      */
104     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
105 
106     /**
107      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
108      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
109      * @param exchange
110      * @param routingKey
111      * @param message
112      * @param messagePostProcessor
113      * @param correlationData
114      */
115     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
116 
117     Message messageBuil(Object handleObject, String msgId);
118 
119     String getMsgId();
120 }
MQProducerImpl.java
  1 package org.seckill.utils.rabbitmq.Impl;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import org.seckill.utils.rabbitmq.MQProducer;
  5 import org.slf4j.Logger;
  6 import org.slf4j.LoggerFactory;
  7 import org.springframework.amqp.AmqpException;
  8 import org.springframework.amqp.core.Message;
  9 import org.springframework.amqp.core.MessageBuilder;
 10 import org.springframework.amqp.core.MessagePostProcessor;
 11 import org.springframework.amqp.core.MessageProperties;
 12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 13 import org.springframework.amqp.rabbit.support.CorrelationData;
 14 import org.springframework.stereotype.Component;
 15 
 16 import java.io.UnsupportedEncodingException;
 17 import java.util.UUID;
 18 
 19 /**
 20  * <p>Title: org.seckill.utils.rabbitmq.Impl</p>
 21  * <p>Company:東軟集團(neusoft)</p>
 22  * <p>Copyright:Copyright(c)2018</p>
 23  * User: 段美林
 24  * Date: 2018/6/2 22:54
 25  * Description: 消息生產者操做主體類
 26  */
 27 @Component
 28 public class MQProducerImpl implements MQProducer{
 29 
 30     private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
 31 
 32     private RabbitTemplate rabbitTemplate;
 33 
 34     /**
 35      * Sets the rabbitTemplate.
 36      * <p>
 37      * <p>You can use getRabbitTemplate() to get the value of rabbitTemplate</p>
 38      *
 39      * @param rabbitTemplate rabbitTemplate
 40      */
 41     public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
 42         this.rabbitTemplate = rabbitTemplate;
 43     }
 44 
 45     /**
 46      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 47      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 48      *
 49      * @param message
 50      */
 51     public void sendDataToRabbitMQ(Object message) {
 52         try {
 53             if (message instanceof Message){
 54                 Message messageSend = (Message) message;
 55                 String msgId = messageSend.getMessageProperties().getCorrelationId();
 56                 CorrelationData correlationData = new CorrelationData(msgId);
 57                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData);
 58             }else {
 59                 rabbitTemplate.convertAndSend(message);
 60             }
 61         } catch (AmqpException e) {
 62             logger.error(e.getMessage(), e);
 63         }
 64     }
 65 
 66     /**
 67      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 68      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 69      *
 70      * @param message
 71      * @param messagePostProcessor
 72      */
 73     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
 74         try {
 75             if (message instanceof Message){
 76                 Message messageSend = (Message) message;
 77                 String msgId = messageSend.getMessageProperties().getCorrelationId();
 78                 CorrelationData correlationData = new CorrelationData(msgId);
 79                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData);
 80             }else {
 81                 rabbitTemplate.convertAndSend(message, messagePostProcessor);
 82             }
 83         } catch (AmqpException e) {
 84             logger.error(e.getMessage(), e);
 85         }
 86     }
 87 
 88     /**
 89      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
 90      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
 91      *
 92      * @param message
 93      * @param messagePostProcessor
 94      * @param correlationData
 95      */
 96     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
 97         try {
 98             rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData);
 99         } catch (AmqpException e) {
100             logger.error(e.getMessage(), e);
101         }
102     }
103 
104     /**
105      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
106      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
107      *
108      * @param routingKey
109      * @param message
110      */
111     public void sendDataToRabbitMQ(String routingKey, Object message) {
112         try {
113             if (message instanceof Message){
114                 Message messageSend = (Message) message;
115                 String msgId = messageSend.getMessageProperties().getCorrelationId();
116                 CorrelationData correlationData = new CorrelationData(msgId);
117                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
118             }else {
119                 rabbitTemplate.convertAndSend(routingKey, message);
120             }
121         } catch (AmqpException e) {
122             logger.error(e.getMessage(), e);
123         }
124     }
125 
126     /**
127      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
128      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
129      *
130      * @param routingKey
131      * @param message
132      * @param correlationData
133      */
134     public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {
135         try {
136             rabbitTemplate.convertAndSend(routingKey, message, correlationData);
137         } catch (AmqpException e) {
138             logger.error(e.getMessage(), e);
139         }
140     }
141 
142     /**
143      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
144      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
145      *
146      * @param routingKey
147      * @param message
148      * @param messagePostProcessor
149      */
150     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
151         try {
152             if (message instanceof Message){
153                 Message messageSend = (Message) message;
154                 String msgId = messageSend.getMessageProperties().getCorrelationId();
155                 CorrelationData correlationData = new CorrelationData(msgId);
156                 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);
157             }else {
158                 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);
159             }
160         } catch (AmqpException e) {
161             logger.error(e.getMessage(), e);
162         }
163     }
164 
165     /**
166      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
167      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
168      *
169      * @param routingKey
170      * @param message
171      * @param messagePostProcessor
172      * @param correlationData
173      */
174     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
175         try {
176             rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);
177         } catch (AmqpException e) {
178             logger.error(e.getMessage(), e);
179         }
180     }
181 
182     /**
183      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
184      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
185      *
186      * @param exchange
187      * @param routingKey
188      * @param message
189      */
190     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
191         try {
192             if (message instanceof Message){
193                 Message messageSend = (Message) message;
194                 String msgId = messageSend.getMessageProperties().getCorrelationId();
195                 CorrelationData correlationData = new CorrelationData(msgId);
196                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
197             }else {
198                 rabbitTemplate.convertAndSend(exchange, routingKey, message);
199             }
200         } catch (AmqpException e) {
201             logger.error(e.getMessage(), e);
202         }
203     }
204 
205     /**
206      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
207      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
208      *
209      * @param exchange
210      * @param routingKey
211      * @param message
212      * @param correlationData
213      */
214     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {
215         try {
216             rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
217         } catch (AmqpException e) {
218             logger.error(e.getMessage(), e);
219         }
220     }
221 
222     /**
223      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
224      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
225      *
226      * @param exchange
227      * @param routingKey
228      * @param message
229      * @param messagePostProcessor
230      */
231     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
232         try {
233             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
234         } catch (AmqpException e) {
235             logger.error(e.getMessage(), e);
236         }
237     }
238 
239     /**
240      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
241      * 因爲配置了JSON轉換,這裏是將Java對象轉換成JSON字符串的形式。
242      *
243      * @param exchange
244      * @param routingKey
245      * @param message
246      * @param messagePostProcessor
247      * @param correlationData
248      */
249     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
250         try {
251             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
252         } catch (AmqpException e) {
253             logger.error(e.getMessage(), e);
254         }
255     }
256 
257     /**
258      * 構建Message對象,進行消息發送
259      * @param handleObject
260      * @param msgId
261      * @return
262      */
263     public Message messageBuil(Object handleObject, String msgId) {
264         try {
265             //先轉成JSON
266             String objectJSON = JSONObject.toJSONString(handleObject);
267             //再構建Message對象
268             Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
269                     .setCorrelationId(msgId).build();
270             return messageBuil;
271         } catch (UnsupportedEncodingException e) {
272             logger.error("構建Message出錯:" + e.getMessage(),e);
273             return null;
274         }
275     }
276 
277     /**
278      * 生成惟一的消息操做id
279      * @return
280      */
281     public String getMsgId() {
282         return UUID.randomUUID().toString();
283     }
284 
285 }

至此就完成了延遲消息隊列的全部代碼實現,

相關文章
相關標籤/搜索