Spring集成RabbitMQ實現死信隊列

  上班時間閒的無聊更一篇帖子,以前公司安排了 一個需求,覺得要用RabbitMQ實現,就本身研究了一下,終於在Giant努力之下,MQ仍是被征服,把消息變成死信的方式有不少,我只實現了其中的一種,(我怎麼可能會告訴大家其實我不會其餘的呢)有興趣的話,能夠本身去網上研究研究其餘的實現方式,好了,咱們直奔主題吧.
html

   首先,咱們先要聲明一個隊列,並給這個隊列聲明一個轉發的交換機,而後指定它的路由,(這麼作是讓"METHOD_EVENT_DIRECT_QUEUE"中的消息過時而後放入到這個)java

<rabbit:queue id = "METHOD_EVENT_DIRECT_QUEUE" name="METHOD_EVENT_DIRECT_QUEUE" durable="true" auto-delete="false" exclusive="false">
<!--聲明轉發的交換機,以及轉發的路由-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dlx.exchange" />
<entry key="x-dead-letter-routing-key" value="routingKey" />
</rabbit:queue-arguments>

</rabbit:queue>程序員


其次,咱們再聲明一個正常隊列
spring

<rabbit:queue id="dlq.queue" name="dlq.queue" durable="true" auto-delete="false" exclusive="false"/>


最後,咱們把"METHOD_EVENT_DIRECT_QUEUE" 以及"dlq.queue" 綁定在"dlx.exchange"(轉發的交換機),交換機下面,指定dlq.queue(正常隊列的路由爲轉發的路由)路由爲"routingKey" ,METHOD_EVENT_DIRECT_QUEUE隊列的路由爲"METHOD_EVENT_DIRECT_ROUTING_KEY"(本身隨便起名字)微信

<!-- 死信隊列綁定 -->
<rabbit:direct-exchange name="dlx.exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="dlq.queue" key="routingKey"></rabbit:binding>
<rabbit:binding queue="METHOD_EVENT_DIRECT_QUEUE" key="METHOD_EVENT_DIRECT_ROUTING_KEY"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>


聲明消息接收者
ide

<!-- 消息接收者 -->
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象, 手動確認機制-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">

<rabbit:listener queues="dlq.queue" ref="msgConsumer"/>
</rabbit:listener-container>

這樣咱們死信隊列的一個配置就算是完成了.post


下面咱們開始聲明一個生產者,開始生產消息,消息要設置過時時間(時間由本身指定以秒爲單位)
url

/**
* @Description 消息延時發送
*
* @author gzj
* @date 2019年12月17日 上午9:12:06
* @param routingKey 路由鍵
* @param message 消息體
* @param delayTime 延遲時間 單位秒
* @return MessageResponse
*/
public static void sendDelay(final String routingKey, Object message,String delayTime) {
AmqpTemplate amqpTemplate = SpringContextHolder.getBean("amqpTemplate");
//final int xdelay = delayTime * 1000;
amqpTemplate.convertAndSend("dlx.exchange",routingKey, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//設置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設置延遲時間
message.getMessageProperties().setExpiration(delayTime);
return message;
}
});
}


package com.hmgj.impl.user;
import com.alibaba.dubbo.config.annotation.Service;
import com.hmgj.helper.RabbitHelper;;
import com.hmgj.service.user.ProviderService;
import org.springframework.amqp.core.AmqpTemplate;
import javax.annotation.Resource;
/**
* @author gzj
* @Date 2019/11/28.
*/
@Service
public class ProviderServiceImpl implements ProviderService{
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMsg(String content) {
System.out.println("要發送的消息 :" + content);
RabbitHelper.sendDelay("METHOD_EVENT_DIRECT_ROUTING_KEY","132","6000");
System.out.println("消息發送成功");
}
}



消費者監聽正常隊"dlq.queue"列,接收消息(注意網上好多都是監聽的"METHOD_EVENT_DIRECT_QUEUE"隊列,我發現這個寫法是有問題的)
spa

package com.hmgj.task.queue;
import java.io.UnsupportedEncodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @author gzj
* @Date 2019/11/28.
*/
@Component
public class MsgConsumer implements ChannelAwareMessageListener {
private final Logger logger = LoggerFactory.getLogger(MsgConsumer.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String context = "";
logger.info("進入隊列");
try {
context = new String(message.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
logger.info("message:" + message.toString());
logger.info("接收處理當前監聽隊列當中的消息:" + context + "\n 當前線程name:" + Thread.currentThread().getName() + "\n 當前線程id:"
+ Thread.currentThread().getId());
// 消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// nack返回false,並從新回到隊列,就是重發機制,一般存在於消費失敗後處理中;
//第三個參數與拒絕消息方法的第二個參數同理。即true從新進入隊列,false則丟棄;
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒絕消息,即丟棄消息,消息不會從新回到隊列,後面的參數爲true則重入隊列;爲false則丟棄;
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}


最終實現結果
.net



本文分享自微信公衆號 - 程序員真正幽默段子(gh_b9e01d69a484)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索