rabbitMq是受歡迎的消息中間件之一,相比其餘的消息中間件,具備高併發的特性(天生具有高併發高可用的erlang語言編寫),除此以外,還能夠持久化,保證消息不易丟失,高可用,實現集羣部署,提供靈活的路由和可靠性,可視化管理等等的優勢。java
相比於其餘的消息隊列,rabbitmq最大的特點就是加入了exchange(交換器)這個東西,AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者一般不知道是否一個消息會被髮送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,而後Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就相似於一個交換機,轉發各個消息分發到相應的隊列中。web
RabbitMQ經常使用的Exchange Type有三種:fanout、direct、topic。spring
fanout:把全部發送到該Exchange的消息投遞到全部與它綁定的隊列中。apache
direct:把消息投遞到那些binding key與routing key徹底匹配的隊列中。併發
topic:將消息路由到binding key與routing key模式匹配的隊列中。app
言歸正傳,延時隊列如何經過rabbitmq來實現呢?ide
分析:首先rabbitmq本身是不具有延時的功能的,除了使用官方提供的插件以外,咱們還能夠經過ttl(設置超時時間的方式)+ DLX(一個死信隊列)的方式來實現 + Router(轉發隊列)高併發
其中,ttl能夠設置在消息上,也能夠設置在隊列上,設置在消息上能夠提供更大的靈活性,可是若是同時設置超時時間的話,就取最小的超時時間爲準。post
此外,死信隊列是一個普通的隊列,它沒有消費者,用來存儲有超時時間信息的消息,而且能夠設置當消息超時(ttl),轉發到另外一個指定隊列(此處設置轉發到router, 當發送消息以後(發送時,帶上要延時的隊列名稱),等待消息超時,將消息轉發到指定的Router隊列。測試
最後,轉發隊列,用來接收死信隊列超時消息,在接收到以後,消費者將消息解析,獲取queueName,body,再向所獲取的queueName隊列發送一條消息,內容爲body.
下面是代碼:
生產者:
package cn.chinotan.service.delayQueueRabbitMQ; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @program: test * @description: 生產者 * @author: xingcheng * @create: 2018-08-12 12:33 **/ @Service public class Producr { private static final Logger LOGGER = LoggerFactory.getLogger(Producr.class); @Autowired private AmqpTemplate amqpTemplate; public void send(String msg, long time, String delayQueueName) { //rabbit默認爲毫秒級 long times = time * 1000; MessagePostProcessor processor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(times)); return message; } }; // 拼裝msg msg = StringUtils.join(msg, ":", delayQueueName); amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, MqConstant.DEAD_LETTER_QUEUE, msg, processor); } }
消費隊列1:
package cn.chinotan.service.delayQueueRabbitMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: queueOne消費者 * @author: xingcheng * @create: 2018-08-12 12:35 **/ @Service public class MyQueueOneConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueOneConsumer.class); @RabbitListener(queues=MqConstant.MY_QUEUE_ONE) @RabbitHandler public void process(String content) { LOGGER.info("延遲時間到,queueOne開始執行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
消費隊列2:
package cn.chinotan.service.delayQueueRabbitMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: queueTwo隊列 * @author: xingcheng * @create: 2018-08-12 12:35 **/ @Service public class MyQueueTwoConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueTwoConsumer.class); @Autowired private Producr producr; @RabbitListener(queues=MqConstant.MY_QUEUE_TWO) @RabbitHandler public void process(String content) { LOGGER.info("延遲時間到,queueTwo開始執行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
轉發隊列:
package cn.chinotan.service.delayQueueRabbitMQ; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: 轉發隊列 * @author: xingcheng * @create: 2018-08-12 12:35 **/ @Service public class TradeProcess { private static final Logger LOGGER = LoggerFactory.getLogger(TradeProcess.class); @Autowired private AmqpTemplate amqpTemplate; @RabbitListener(queues=MqConstant.MY_TRANS_QUEUE) @RabbitHandler public void process(String content) { String msg = content.split(":")[0]; String delayQueueName = content.split(":")[1]; amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, delayQueueName, msg); LOGGER.info("進行轉發 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } }
隊列配置:
package cn.chinotan.service.delayQueueRabbitMQ; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @program: test * @description: 延時隊列rabbitMQ配置 * @author: xingcheng * @create: 2018-08-12 12:27 **/ @Configuration public class RabbitConfig { @Bean public DirectExchange myExchange() { return new DirectExchange(MqConstant.MY_EXCHANGE, true, false); } @Bean public Queue myQueueOne() { return new Queue(MqConstant.MY_QUEUE_ONE, true, false, false); } @Bean public Queue myQueueTwo() { return new Queue(MqConstant.MY_QUEUE_TWO, true, false, false); } @Bean public Queue myTransQueue() { return new Queue(MqConstant.MY_TRANS_QUEUE, true, false, false); } @Bean public Queue deadLetterQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", MqConstant.MY_EXCHANGE); map.put("x-dead-letter-routing-key", MqConstant.MY_TRANS_QUEUE); Queue queue = new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false, map); System.out.println("arguments :" + queue.getArguments()); return queue; } @Bean public Binding queueOneBinding() { return BindingBuilder.bind(myQueueOne()).to(myExchange()).with(MqConstant.MY_QUEUE_ONE); } @Bean public Binding queueTwoBinding() { return BindingBuilder.bind(myQueueTwo()).to(myExchange()).with(MqConstant.MY_QUEUE_TWO); } @Bean public Binding queueDeadBinding() { return BindingBuilder.bind(deadLetterQueue()).to(myExchange()).with(MqConstant.DEAD_LETTER_QUEUE); } @Bean public Binding queueTransBinding() { return BindingBuilder.bind(myTransQueue()).to(myExchange()).with(MqConstant.MY_TRANS_QUEUE); } }
隊列常量配置:
package cn.chinotan.service.delayQueueRabbitMQ; /** * @program: test * @description: rabbitMq常量 * @author: xingcheng * @create: 2018-08-12 12:30 **/ public class MqConstant { public static final String MY_EXCHANGE = "my_exchange"; public static final String MY_QUEUE_ONE = "my_queue_one"; public static final String MY_QUEUE_TWO = "my_queue_two"; public static final String DEAD_LETTER_QUEUE = "dead_letter_queue"; public static final String MY_TRANS_QUEUE = "my_trans_queue"; }
測試延時controller:
package cn.chinotan.controller; import cn.chinotan.service.delayQueueRabbitMQ.MqConstant; import cn.chinotan.service.delayQueueRabbitMQ.Producr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * @program: test * @description: 延時隊列啓動類 * @author: xingcheng * @create: 2018-08-12 15:41 **/ @RestController @RequestMapping("/delayQueue") public class DelayQueueController { private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueController.class); @Autowired private Producr producr; @GetMapping("/send/{time}") public String send(@PathVariable("time") int time){ LOGGER.info("{}秒後, 發送延遲消息,當前時間{}", time, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); producr.send("我是延時消息...", time, MqConstant.MY_QUEUE_TWO); return "ok"; } }
測試結果: