當一條消息在隊列中出現如下三種狀況的時候,該消息就會變成一條死信。java
當消息在一個隊列中變成一個死信以後,若是配置了死信隊列,它將被從新publish到死信交換機,死信交換機將死信投遞到一個隊列上,這個隊列就是死信隊列。spring
建立一個消費者,綁定消費隊列及死信交換機,交換機默認爲direct
模型,死信交換機也是,arguments綁定死信交換機和key。(註解支持的具體參數文末會附上)緩存
public class DirectConsumer { @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) }) public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消費者1"+message); }
public void publishMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatripDirect","info",message); }
spring: rabbitmq: listener: simple: acknowledge-mode: manual
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicNack(deliverTag,false,false);
@RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "javatripDead"), exchange = @Exchange(value = "deadExchange"), key = "deadKey" ) }) public void receive2(String message){ System.out.println("我是一條死信:"+message); }
綁定業務隊列的時候,增長消息的過時時長,當消息過時後,消息將被轉發到死信隊列中。app
@RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey"), @Argument(name = "x-message-ttl",value = "3000") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) }) public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消費者1"+message); }
設置消息隊列長度,當隊列中的消息達到最大長度後,繼續發送消息,消息將被轉發到死信隊列中。this
@RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "javatrip",arguments = {@Argument(name="x-dead-letter-exchange",value = "deadExchange"), @Argument(name="x-dead-letter-routing-key",value = "deadKey"), @Argument(name = "x-max-length",value = "3") }), exchange = @Exchange(value="javatripDirect"), key = {"info","error","warning"} ) }) public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{ System.out.println("消費者1"+message); }
@Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface QueueBinding { /** * @return the queue. */ Queue value(); /** * @return the exchange. */ Exchange exchange(); /** * @return the routing key or pattern for the binding. * Multiple elements will result in multiple bindings. */ String[] key() default {}; }
@Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface Queue { /** * @return the queue name or "" for a generated queue name (default). */ @AliasFor("name") String value() default ""; /** * @return the queue name or "" for a generated queue name (default). * @since 2.0 */ @AliasFor("value") String name() default ""; /** * 是否持久化 */ String durable() default ""; /** * 是否獨享、排外的. */ String exclusive() default ""; /** * 是否自動刪除; */ String autoDelete() default ""; /** * 隊列的其餘屬性參數 * (1)x-message-ttl:消息的過時時間,單位:毫秒; *(2)x-expires:隊列過時時間,隊列在多長時間未被訪問將被刪除,單位:毫秒; *(3)x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息; *(4)x-max-length-bytes:隊列消息內容佔用最大空間,受限於內存大小,超過該閾值則從隊列頭部開始刪除消息; *(5)x-overflow:設置隊列溢出行爲。這決定了當達到隊列的最大長度時消息會發生什麼。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head; *(6)x-dead-letter-exchange:死信交換器名稱,過時或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發送到該交換器中; *(7)x-dead-letter-routing-key:死信消息路由鍵,在消息發送到死信交換器時會使用該路由鍵,若是不設置,則使用消息的原來的路由鍵值 *(8)x-single-active-consumer:表示隊列是不是單一活動消費者,true時,註冊的消費組內只有一個消費者消費消息,其餘被忽略,false時消息循環分發給全部消費者(默認false) *(9)x-max-priority:隊列要支持的最大優先級數;若是未設置,隊列將不支持消息優先級; *(10)x-queue-mode(Lazy mode):將隊列設置爲延遲模式,在磁盤上保留儘量多的消息,以減小RAM的使用;若是未設置,隊列將保留內存緩存以儘量快地傳遞消息; *(11)x-queue-master-locator:在集羣模式下設置鏡像隊列的主節點信息。 */ Argument[] arguments() default {}; }
@Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface Exchange { String TRUE = "true"; String FALSE = "false"; /** * @return the exchange name. */ @AliasFor("name") String value() default ""; /** * @return the exchange name. * @since 2.0 */ @AliasFor("value") String name() default ""; /** * 交換機類型,默認DIRECT */ String type() default ExchangeTypes.DIRECT; /** * 是否持久化 */ String durable() default TRUE; /** * 是否自動刪除 */ String autoDelete() default FALSE; /** * @return the arguments to apply when declaring this exchange. * @since 1.6 */ Argument[] arguments() default {}; }