延遲消費。好比:用戶生成訂單以後,須要過一段時間校驗訂單的支付狀態,若是訂單仍未支付則須要及時地關閉訂單;用戶註冊成功以後,須要過一段時間好比一週後校驗用戶的使用狀況,若是發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。web
rabbitmq的消息TTL和死信Exchange結合tomcat
1.消息的TTL(Time To Live)服務器
消息的TTL就是消息的存活時間。RabbitMQ能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。若是隊列設置了,消息也設置了,那麼會取小的。因此一個消息若是被路由到不一樣的隊列中,這個消息死亡的時間有可能不同(不一樣的隊列設置)。這裏單講單個消息的TTL,由於它纔是實現延遲任務的關鍵。能夠經過設置消息的expiration字段或者x-message-ttl屬性來設置時間,二者是同樣的效果。app
2.Dead Letter Exchangeside
Exchage的概念在這裏就不在贅述。一個消息在知足以下條件下,會進死信路由,記住這裏是路由而不是隊列,一個路由能夠對應不少隊列。post
①.一個消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。測試
②. 上面的消息的TTL到了,消息過時了。ui
③. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。this
Dead Letter Exchange其實就是一種普通的exchange,和建立其餘exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過時了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。spa
3.實現延遲隊列
咱們先設置好各個配置的字符串
public interface TestMq { /** * 隊列名 */ String TEST_QUEUE = "test";; /** * 服務添加routing key */ String ROUTING_KEY_TEST = "post.test"; /** * 死信隊列 */ String DEAD_QUEUE = "dead"; String ROURING_KEY_DEAD = "dead.routing.key"; String MQ_EXCHANGE_DEAD = "dead.exchange"; }
配置信息
/** * rabbitmq配置 * */ @Configuration public class RabbitmqConfig { /** * 死信隊列 * @return */ @Bean public Queue deadQueue() { Map<String,Object> arguments = new HashMap<>(); //此處填入死信交換機 arguments.put("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD); //此處填入消息隊列的路由,而非死信隊列本身的路由 arguments.put("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST); return new Queue(TestMq.DEAD_QUEUE,true,false,false,arguments); } /** * 死信交換機 * @return */ @Bean public DirectExchange deadExchange() { return new DirectExchange(TestMq.MQ_EXCHANGE_DEAD); } /** * 綁定死信隊列到死信交換機 * @return */ @Bean public Binding bindingDeadExchange() { return BindingBuilder.bind(deadQueue()).to(deadExchange()) .with(TestMq.ROURING_KEY_DEAD); } /** * 被消費者偵聽的獲取消息的隊列 * @return */ @Bean public Queue testQueue() { return new Queue(TestMq.TEST_QUEUE,true,false,false); } /** * 將消息隊列綁定到死信交換機,跟死信隊列的路由不一樣 * @return */ @Bean public Binding bindingTest() { return BindingBuilder.bind(testQueue()).to(deadExchange()) .with(TestMq.ROUTING_KEY_TEST); } }
消息生產者
@Slf4j @Component public class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) { log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); MessagePostProcessor processor = message -> { //給消息設置的過時時間,咱們這裏爲10秒 message.getMessageProperties().setExpiration(10000 + ""); return message; }; this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor); } /** * 確認後回調: * @param correlationData * @param ack * @param cause */ @Override public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) { if (!ack) { log.info("send ack fail, cause = " + cause); } else { log.info("send ack success"); } } /** * 失敗後return回調: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); } /** * 對消息對象進行二進制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
消費者
@Slf4j @Component @RabbitListener(queues = TestMq.TEST_QUEUE) public class TestConsumer { @RabbitHandler public void receice(byte[] data, Channel channel, Message message) throws IOException { try { //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); Integer orderNo = unSerialize(data); log.info(orderNo + "爲收到的消息"); } catch (IOException e) { e.printStackTrace(); //丟棄這條消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } } /** * 反序列化 * @param data * @return */ private Integer unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,Integer.class); } finally { input.close(); } } }
咱們隨便寫個測試
@Service public class TestService { @Autowired private TestSender sender; @PostConstruct public void test() { //此處順序爲死信交換機,死信隊列路由,消息 sender.send(TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1); } }
經測試
2019-10-11 17:26:18.079 INFO 879 --- [ main] c.g.rabbitdelay.config.TestSender : send content=1
2019-10-11 17:26:18.098 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [xxx.xxx.xxx.xxx:5672]
2019-10-11 17:26:18.227 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]
2019-10-11 17:26:18.337 INFO 879 --- [39.9.225.2:5672] c.g.rabbitdelay.config.TestSender : send ack success
2019-10-11 17:26:18.446 INFO 879 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-11 17:26:18.751 INFO 879 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-10-11 17:26:18.959 INFO 879 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-11 17:26:18.962 INFO 879 --- [ main] c.g.rabbitdelay.RabbitdelayApplication : Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)
2019-10-11 17:26:28.342 INFO 879 --- [ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer : 1爲收到的消息
經過日誌能夠看到,發送消息是18秒,收到消息消費爲28秒,中間隔了10秒鐘。