>前段時間在編寫通用的消息通知服務時,因爲須要實現相似通知失敗時,須要延後幾分鐘再次進行發送,進行屢次嘗試後,進入定時發送機制。此機制,在原先對接銀聯支付時,銀聯的異步通知也是相似的,在第一次通知失敗後,支付標準服務會重發,最多發送五次,每次的間隔時間爲一、四、八、16分鐘等。本文就簡單講解下使用RabbitMQ實現延時消息隊列功能。html
>在此以前,簡單說明下基於RabbitMQ實現延時隊列的相關知識及說明下延時隊列的使用場景。java
>在不少的業務場景中,延時隊列能夠實現不少功能,此類業務中,通常上是非實時的,須要延遲處理的,須要進行重試補償的。git
>自己在RabbitMQ
中是未直接提供延時隊列功能的,但可使用TTL(Time-To-Live,存活時間)
和DLX(Dead-Letter-Exchange,死信隊列交換機)
的特性實現延時隊列的功能。github
>RabbitMQ
中能夠對隊列和消息分別設置TTL,TTL代表了一條消息可在隊列中存活的最大時間。當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在TTL時間後**死亡
成爲Dead Letter
**。若是既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。web
>上個知識點也提到了,設置了TTL
的消息或隊列最終會成爲Dead Letter
,當消息在一個隊列中變成死信以後,它能被從新發送到另外一個交換機中,這個交換機就是DLX,綁定此DLX的隊列就是死信隊列。redis
一個消息變成死信通常上是因爲如下幾種狀況;spring
因此,經過TTL
和DLX
的特性能夠模擬實現延時隊列的功能。當隊列中的消息超時成爲死信後,會把消息死信從新發送到配置好的交換機中,而後分發到真實的消費隊列。故簡單來講,咱們能夠建立2個隊列,一個隊列用於發送消息,一個隊列用於消息過時後的轉發的目標隊列。數據庫
>如下使用SpringBoot
集成RabbitMQ
進行實戰說明,在進行http
消息通知時,若通知失敗(地址不可用或者鏈接超時)時,將此消息轉入延時隊列中,待特定時間後進行從新發送。json
0.引入pom依賴api
<!-- rabbit --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> <!-- 簡化http操做 --> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-http</artifactid> <version>4.5.16</version> </dependency> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-json</artifactid> <version>4.5.16</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency>
1.編寫rabbitmq
配置文件(關鍵配置) RabbitConfig.java
/** * * @ClassName 類名:RabbitConfig * @Description 功能說明: * <p> * TODO *</p> ************************************************************************ * @date 建立日期:2019年7月17日 * @author 建立人:oKong * @version 版本號:V1.0 *<p> ***************************修訂記錄************************************* * * 2019年7月17日 oKong 建立該類功能。 * *********************************************************************** *</p> */ @Configuration public class RabbitConfig { @Autowired ConnectionFactory connectionFactory; /** * 消費者線程數 設置大點 大機率是能通知到的 */ @Value("${http.notify.concurrency:50}") int concurrency; /** * 延遲隊列的消費者線程數 可設置小點 */ @Value("${http.notify.delay.concurrency:20}") int delayConcurrency; @Bean public RabbitAdmin rabbitAdmin() { return new RabbitAdmin(connectionFactory); } @Bean public DirectExchange httpMessageNotifyDirectExchange(RabbitAdmin rabbitAdmin) { //durable 是否持久化 //autoDelete 是否自動刪除,即服務端或者客服端下線後 交換機自動刪除 DirectExchange directExchange = new DirectExchange(ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false); directExchange.setAdminsThatShouldDeclare(rabbitAdmin); return directExchange; } //設置消息隊列 @Bean public Queue httpMessageStartQueue(RabbitAdmin rabbitAdmin) { /* 建立接收隊列,4個參數 name - 隊列名稱 durable - false,不進行持有化 exclusive - true,獨佔性 autoDelete - true,自動刪除*/ Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } //隊列綁定交換機 @Bean public Binding bindingStartQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) { Binding binding = BindingBuilder.bind(httpMessageStartQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_START_RK); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } @Bean public Queue httpMessageOneQueue(RabbitAdmin rabbitAdmin) { Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } @Bean public Binding bindingOneQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) { Binding binding = BindingBuilder.bind(httpMessageOneQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_ONE_RK); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } //-------------設置延遲隊列--開始-------------------- @Bean public Queue httpDelayOneQueue() { //name - 隊列名稱 //durable - true //exclusive - false //autoDelete - false return QueueBuilder.durable("http.message.dlx.one") //如下是重點:當變成死信隊列時,會轉發至 路由爲x-dead-letter-exchange及x-dead-letter-routing-key的隊列中 .withArgument("x-dead-letter-exchange", ApplicationConstant.HTTP_MESSAGE_EXCHANGE) .withArgument("x-dead-letter-routing-key", ApplicationConstant.HTTP_MESSAGE_ONE_RK) .withArgument("x-message-ttl", 1*60*1000)//1分鐘 過時時間(單位:毫秒),當過時後 會變成死信隊列,以後進行轉發 .build(); } //綁定到交換機上 @Bean public Binding bindingDelayOneQuene(RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) { Binding binding = BindingBuilder.bind(httpDelayOneQueue).to(httpMessageNotifyDirectExchange).with("delay.one"); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } //-------------設置延遲隊列--結束-------------------- //建議將正常的隊列和延遲處理的隊列分開 //設置監聽容器 @Bean("notifyListenerContainer") public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手動ack factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(1); factory.setConcurrentConsumers(concurrency); return factory; } // 設置監聽容器 @Bean("delayNotifyListenerContainer") public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手動ack factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(1); factory.setConcurrentConsumers(delayConcurrency); return factory; } }
ApplicationConstant.java
public class ApplicationConstant { /** * 發送http通知的 exchange 隊列 */ public static final String HTTP_MESSAGE_EXCHANGE = "http.message.exchange"; /** * 配置消息隊列和路由key值 */ public static final String HTTP_MESSAGE_START_QUEUE_NAME = "http.message.start"; public static final String HTTP_MESSAGE_START_RK = "rk.start"; public static final String HTTP_MESSAGE_ONE_QUEUE_NAME = "http.message.one"; public static final String HTTP_MESSAGE_ONE_RK = "rk.one"; /** * 通知隊列對應的延遲隊列關係,即過時隊列以後發送到下一個的隊列信息,能夠根據實際狀況添加,固然也能夠根據必定規則自動生成 */ public static final Map<string,string> delayRefMap = new HashMap<string, string>() { /** * */ private static final long serialVersionUID = -779823216035682493L; { put(HTTP_MESSAGE_START_QUEUE_NAME, "delay.one"); } }; }
簡單來講,就是建立一個正常消息發送隊列,用於接收http消息請求的參數,同時進行http請求。同時,建立一個延時隊列,設置其x-dead-letter-exchange
、x-dead-letter-routing-key
和x-message-ttl
值,將其轉發到正常的隊列中。使用一個map對象維護一個關係,當正常消息異常時,須要發送的延時隊列的隊列名稱,固然時間場景彙總,根據須要能夠進行動態配置或者根據必定規則進行動態映射。
2.建立監聽類,用於消息的消費操做,此處使用@RabbitListener
來消費消息(固然也可使用SimpleMessageListenerContainer
進行消息配置的),建立了一個正常消息監聽和延時隊列監聽,因爲通常上異常通知是低機率事件,可根據不一樣的監聽容器進行差別化配置。
/** * * @ClassName 類名:HttpMessagerLister * @Description 功能說明:http通知消費監聽接口 * <p> * TODO *</p> ************************************************************************ * @date 建立日期:2019年7月17日 * @author 建立人:oKong * @version 版本號:V1.0 *<p> ***************************修訂記錄************************************* * * 2019年7月17日 oKong 建立該類功能。 * *********************************************************************** *</p> */ @Component @Slf4j public class HttpMessagerLister { @Autowired HttpMessagerService messagerService; @RabbitListener(id = "httpMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory = "notifyListenerContainer") public void httpMessageNotifyConsumer(Message message, Channel channel) throws Exception { doHandler(message, channel); } @RabbitListener(id= "httpDelayMessageNotifyConsumer", queues = { ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory = "delayNotifyListenerContainer") public void httpDelayMessageNotifyConsumer(Message message, Channel channel) throws Exception { doHandler(message, channel); } private void doHandler(Message message, Channel channel) throws Exception { String body = new String(message.getBody(),"utf-8"); String queue = message.getMessageProperties().getConsumerQueue(); log.info("接收到通知請求:{},隊列名:{}",body, queue); //消息對象轉換 try { HttpEntity httpNotifyDto = JSONUtil.toBean(body, HttpEntity.class); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //發送通知 messagerService.notify(queue, httpNotifyDto); } catch(Exception e) { log.error(e.getMessage()); //ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
HttpMessagerService.java
:消息真正處理的類,此類是關鍵,這裏未進行日誌記錄,真實場景中,強烈建議進行消息通知的日誌存儲,防止往後信息的查看,同時也能經過發送狀態,在重試次數都失敗後,進行定時再次發送功能,同時也有據可查。
@Component @Slf4j public class HttpMessagerService { @Autowired AmqpTemplate mqTemplate; public void notify(String queue,HttpEntity httpEntity) { //發起請求 log.info("開始發起http請求:{}", httpEntity); try { switch(httpEntity.getMethod().toLowerCase()) { case "POST": HttpUtil.post(httpEntity.getUrl(), httpEntity.getParams()); break; case "GET": default: HttpUtil.get(httpEntity.getUrl(), httpEntity.getParams()); } } catch (Exception e) { //發生異常,放入延遲隊列中 String nextRk = ApplicationConstant.delayRefMap.get(queue); if(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals(queue)) { //若已是最後一個延遲隊列的消息隊列了,則後續可直接放入數據庫中 待後續定時策略進行再次發送 log.warn("http通知已經通知N次失敗,進入定時進行發起通知,url={}", httpEntity.getUrl()); } else { log.warn("http從新發送通知:{}, 通知隊列rk爲:{}, 原隊列:{}", httpEntity.getUrl(), nextRk, queue); mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr(httpEntity)); } } } }
3.建立控制層服務(真實場景中,如SpringCloud
微服務中,通常上是建立個api接口,供其餘服務進行調用)
@Slf4j @RestController @Api(tags = "http測試接口") public class HttpDemoController { @Autowired AmqpTemplate mqTemplate; @PostMapping("/send") @ApiOperation(value="send",notes = "發送http測試") public String sendHttp(@RequestBody HttpEntity httpEntity) { //發送http請求 log.info("開始發起http請求,發佈異步消息:{}", httpEntity); mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr(httpEntity)); return "發送成功:url=" + httpEntity.getUrl(); } }
4.配置文件添加RabbitMQ
相關配置信息
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ # 通知-消費者線程數 設置大點 大機率是能通知到的 http.notify.concurrency=150 # 延遲隊列的消費者線程數 可設置小點 http.notify.delay.concurrency=10
5.編寫啓動類。
@SpringBootApplication @Slf4j public class DelayQueueApplication { public static void main(String[] args) throws Exception { SpringApplication.run(DelayQueueApplication.class, args); log.info("spring-boot-rabbitmq-delay-queue-chapter38服務啓動!"); } }
6.啓動服務。使用swagger
進行簡單調用測試。
2019-07-20 23:52:23.792 INFO 65216 --- [nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController : 開始發起http請求,發佈異步消息:HttpEntity(url=www.baidu.com, params={a=1}, method=get) 2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知請求:{"method":"get","params":{"a":1},"url":"www.baidu.com"},隊列名:http.message.start 2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService : 開始發起http請求:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
2019-07-20 23:53:14.699 INFO 65216 --- [nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController : 開始發起http請求,發佈異步消息:HttpEntity(url=www.baidu.com1, params={a=1}, method=get) 2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知請求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},隊列名:http.message.start 2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : 開始發起http請求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get) 2019-07-20 23:53:14.706 WARN 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : http從新發送通知:www.baidu.com1, 通知隊列rk爲:delay.one, 原隊列:http.message.start
在RabbitMQ
後臺中,能夠看見http.message.dlx.one
隊列中存在這須要延時處理的消息,在一分鐘後會轉發至http.message.one
隊列中。
在一分鐘後,能夠看見消息本再次消費了。
2019-07-20 23:54:14.722 INFO 65216 --- [TaskExecutor-16] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知請求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},隊列名:http.message.one 2019-07-20 23:54:14.723 INFO 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : 開始發起http請求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get) 2019-07-20 23:54:14.723 WARN 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : http通知已經通知N次失敗,進入定時進行發起通知,url=www.baidu.com1
>在正式場景中,通常上補償或者重試機制大機率是不會發送的,假若發生時,通常上是第三方業務系統出現了問題,故通常上在進行補充時,應該在非高峯期進行操做,故應該對延時監聽器,應該在高峯期時中止消費,在非高峯期時進行消費。同時,還能夠根據不一樣的通知類型,放入不同的延時隊列中,保障業務的正常。這裏簡單說明下,動態中止或者啓動演示監聽器的方式。通常上是使用RabbitListenerEndpointRegistry
對象獲取延時監聽器,以後進行動態中止或者啓用。可設置@RabbitListener
的id屬性,直接進行獲取,固然也能夠直接獲取全部的監聽器,進行自定義判斷了。
@Autowired RabbitListenerEndpointRegistry registry; @GetMapping("/set") @ApiOperation(value = "set", notes = "設置消息監聽器的狀態") public String setSimpleMessageListenerContainer(String status) { if("1".equals(status)) { registry.getListenerContainer("httpDelayMessageNotifyConsumer").start(); } else { registry.getListenerContainer("httpDelayMessageNotifyConsumer").stop(); } return status; }
這裏,只是簡單進行演示說明,在真實場景下,可使用定時器,判斷當前是否爲高峯期,進而進行動態設置監聽器的狀態。
>本文主要簡單介紹了基於RabbitMQ
實現延時隊列的功能。對於須要實現更加靈活的配置及功能時,如可自定義配置通知次數等,你們可根據本身的需求進行添加,可使用動態建立隊列的方式。固然使用延時隊列的方式還有不少,好比可使用redis
的key值過時回調機制使用,也可使用定時機制。另,發現很久沒有寫文章了,感受寫的有點亂,還望見諒呀~
>目前互聯網上不少大佬都有SpringBoot
系列教程,若有雷同,請多多包涵了。原創不易,碼字不易,還但願你們多多支持。若文中有所錯誤之處,還望提出,謝謝。
499452441
lqdevOps
我的博客:http://blog.lqdev.cn 完整示例:基於RabbitMQ實現消息延遲隊列方案 原文地址:https://blog.lqdev.cn/2019/07/21/springboot/chapter-thirty-eight/</string,></string,string>