TTL是time to live 的簡稱,顧名思義指的是消息的存活時間。rabbitMq能夠從兩種維度設置消息過時時間,分別是隊列和消息自己。 隊列消息過時時間-Per-Queue Message TTL: 經過設置隊列的x-message-ttl參數來設置指定隊列上消息的存活時間,其值是一個非負整數,單位爲微秒。不一樣隊列的過時時間互相之間沒有影響,即便是對於同一條消息。隊列中的消息存在隊列中的時間超過過時時間則成爲死信。測試
隊列中的消息在如下三種狀況下會變成死信 (1)消息被拒絕(basic.reject 或者 basic.nack),而且requeue=false; (2)消息的過時時間到期了; (3)隊列長度限制超過了。 當隊列中的消息成爲死信之後,若是隊列設置了DLX那麼消息會被髮送到DLX。經過x-dead-letter-exchange設置DLX,經過這個x-dead-letter-routing-key設置消息發送到DLX所用的routing-key,若是不設置默認使用消息自己的routing-key.ui
@Bean public Queue lindQueue() { return QueueBuilder.durable(LIND_QUEUE) .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設置死信交換機 .withArgument("x-message-ttl", makeCallExpire) .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設置死信routingKey .build(); }
@Component public class AmqpConfig { /** * 主要測試一個死信隊列,功能主要實現延時消費,原理是先把消息發到正常隊列, * 正常隊列有超時時間,當達到時間後自動發到死信隊列,而後由消費者去消費死信隊列裏的消息. */ public static final String LIND_EXCHANGE = "lind.exchange"; public static final String LIND_DL_EXCHANGE = "lind.dl.exchange"; public static final String LIND_QUEUE = "lind.queue"; public static final String LIND_DEAD_QUEUE = "lind.queue.dead"; public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange"; /** * 單位爲微秒. */ @Value("${tq.makecall.expire:60000}") private long makeCallExpire; /** * 建立普通交換機. */ @Bean public TopicExchange lindExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true) .build(); } /** * 建立死信交換機. */ @Bean public TopicExchange lindExchangeDl() { return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true) .build(); } /** * 建立普通隊列. */ @Bean public Queue lindQueue() { return QueueBuilder.durable(LIND_QUEUE) .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設置死信交換機 .withArgument("x-message-ttl", makeCallExpire) .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設置死信routingKey .build(); } /** * 建立死信隊列. */ @Bean public Queue lindDelayQueue() { return QueueBuilder.durable(LIND_DEAD_QUEUE).build(); } /** * 綁定死信隊列. */ @Bean public Binding bindDeadBuilders() { return BindingBuilder.bind(lindDelayQueue()) .to(lindExchangeDl()) .with(LIND_DEAD_QUEUE); } /** * 綁定普通隊列. * * @return */ @Bean public Binding bindBuilders() { return BindingBuilder.bind(lindQueue()) .to(lindExchange()) .with(LIND_QUEUE); } /** * 廣播交換機. * * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(LIND_FANOUT_EXCHANGE); } } //----------------- @Component public class Publisher { @Autowired private RabbitTemplate rabbitTemplate; public void publish(String message) { try { rabbitTemplate .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE, message); } catch (Exception e) { e.printStackTrace(); } } } //----------------- @Component @Slf4j public class Subscriber { @RabbitListener(queues = AmqpConfig.LIND_QUEUE) public void customerSign(String data) { try { log.info("從隊列拿到數據 :{}", data); } catch (Exception ex) { e.printStackTrace(); } } }