在《RabbitMQ(1)-基礎開發應用》中,咱們已經介紹了RabbitMQ的基礎開發應用。本文基於這些基礎再作一些擴展,延伸出一些高級的用法,如:死信隊列、延遲隊列和優先隊列。不過仍是以死信隊列爲主,由於延遲隊列是死信隊列的衍生概念,並且優先隊列也比較簡單,因此先仍是在代碼層面上,把死信隊列搞透。java
咱們在使用RabbitMQ以前,須要先建立好相關的隊列和交換機,而且設置一些綁定關係。由於幾篇文章都是結合springboot來開發,下面就結合springboot介紹幾種建立方式:spring
@RabbitListener
註解的 bindings
屬性,能夠簡單實現相似功能。@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "true"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) )
Queue、Exchange、Binding
的實例。package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: * @date: 2020/7/12 11:26 下午 * @author: kerry */ @Configuration public class RabbitConfig { public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a"; public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a"; public static final String NORMAL_QUEUE_A="demo.direct.queue.a"; /** * NORMAL 交換機 * @return */ @Bean public Exchange ExchangeA(){ return ExchangeBuilder .directExchange(NORMAL_EXCHANGE_A) .durable(true) .build(); } /** * NORMAL 隊列 * @return */ @Bean public Queue QueueA(){ return QueueBuilder .durable(NORMAL_QUEUE_A) .build(); } /** * 綁定 NORMAL隊列 和 NORMAL交換機 * @return */ @Bean public Binding normalBinding(){ return new Binding(NORMAL_QUEUE_A, Binding.DestinationType.QUEUE, NORMAL_EXCHANGE_A, NORMAL_ROUTING_KEY_A, null); } }
我我的推薦第三種,並且建議是在生產者端定義,消費者應該更關注消費的邏輯。可是若是用代碼來建立,有一個很大的缺點,就是不能刪除和修改,至少我目前還沒找到辦法。json
所以要結合第一種和第三種來使用,固然都用第一種也是能夠的。只是開發人員,更但願隊列、交換機的建立、綁定的邏輯,都體如今代碼裏面,經過代碼能夠更好的閱讀架構設計。segmentfault
死信隊列
這個名字聽起來很特別,但它解決的是平常開發中最多見的問題:不能正常消費的消息,該如何處理
。咱們在第一篇文章中有使用到手動Ack,對於須要nack而且無需重回隊列的消息,指望有統一的異常處理;包括有些消息是有時效性的,若是支付訂單通常都最大支付時常,超時後就應該取消訂單;等等。springboot
死信隊列就是應對這些狀況的,它出現的條件以下:架構
basicNack
或basicReject
,而且此時requeue 屬性被設置爲false。死信隊列的架構以下:併發
生產者 --> 消息 --> 業務交換機 --> 業務隊列 --> 消息變成死信 --> 死信交換機 -->死信隊列 --> 消費者
app
如何配置死信隊列呢?其實很簡單,大概能夠分爲如下步驟:ui
注意,並非直接聲明一個公共的死信隊列,而後因此死信消息就本身跑到死信隊列裏去了。而是爲每一個須要使用死信的業務隊列配置一個死信交換機,這裏同一個項目的死信交換機能夠共用一個,而後爲每一個業務隊列分配一個單獨的路由key。架構設計
有了死信交換機和路由key後,接下來,就像配置業務隊列同樣,配置死信隊列,而後綁定在死信交換機上。也就是說,死信隊列並非什麼特殊的隊列,只不過是綁定在死信交換機上的隊列。死信交換機也不是什麼特殊的交換機,只不過是用來接受死信的交換機,因此能夠爲任何類型【Direct、Fanout、Topic】。通常來講,會爲每一個業務隊列分配一個獨有的路由key,並對應的配置一個死信隊列進行監聽,也就是說,通常會爲每一個重要的業務隊列配置一個死信隊列。
按照簡介中死信隊列的架構,咱們在配置文件中定義了NORMAL
的業務隊列和業務交換機,以及DLX
的死信隊列和死信交換機,並在業務隊列中設置了死信交換機。
RabbitConfig.java
package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: * @date: 2020/7/12 11:26 下午 * @author: kerry */ @Configuration public class RabbitConfig { /** * DLX,定義參數 */ public static final String X_DEAD_LETTER_EXCHANGE="x-dead-letter-exchange"; public static final String X_DEAD_LETTER_ROUTING_KEY="x-dead-letter-routing-key"; public static final String X_MESSAGE_TTL="x-message-ttl"; public static final String X_MAX_LENGTH="x-max-length"; /** * DLX,命名 */ public static final String DEAD_LETTER_EXCHANGE_A="demo.direct.dlx.exchange.a"; public static final String DEAD_LETTER_ROUTING_KEY_A="demo.direct.dlx.routingKey.a"; public static final String DEAD_LETTER_QUEUE_A="demo.direct.dlx.queue.a"; /* * NORMAL,命名 */ public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a"; public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a"; public static final String NORMAL_QUEUE_A="demo.direct.queue.a"; @Bean("jsonRabbitTemplate") public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean("defaultRabbitTemplate") public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * DLX 交換機 * @return */ @Bean public Exchange dlxExchangeA(){ return ExchangeBuilder .directExchange(DEAD_LETTER_EXCHANGE_A) .durable(true) .build(); } /** * DLX 隊列 * @return */ @Bean public Queue dlxQueueA(){ return QueueBuilder .durable(DEAD_LETTER_QUEUE_A) .build(); } /** * NORMAL 交換機 * @return */ @Bean public Exchange ExchangeA(){ return ExchangeBuilder .directExchange(NORMAL_EXCHANGE_A) .durable(true) .build(); } /** * NORMAL 隊列 * @return */ @Bean public Queue QueueA(){ return QueueBuilder .durable(NORMAL_QUEUE_A) //設置 死信交換機 .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A) .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A) //設置 隊列全部消息 存活時間8秒 .withArgument(X_MESSAGE_TTL,8000) //設置 隊列最大長度 10條 .withArgument(X_MAX_LENGTH,10) .build(); } /** * 綁定 DLX隊列 和 DLX交換機 * @return */ @Bean public Binding dlxBinding(){ return new Binding(DEAD_LETTER_QUEUE_A, Binding.DestinationType.QUEUE,DEAD_LETTER_EXCHANGE_A, DEAD_LETTER_ROUTING_KEY_A, null); } /** * 綁定 NORMAL隊列 和 NORMAL交換機 * @return */ @Bean public Binding normalBinding(){ return new Binding(NORMAL_QUEUE_A, Binding.DestinationType.QUEUE, NORMAL_EXCHANGE_A, NORMAL_ROUTING_KEY_A, null); } }
ProducerService.java
@Slf4j @Service public class ProducerService { public void sendText(String data, MessageProperties messageProperties) { /** * 對單個消息 設置TTL */ //messageProperties.setExpiration(String.valueOf(3000)); Message message = defaultRabbitTemplate .getMessageConverter() .toMessage(data, messageProperties); defaultRabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_A, RabbitConfig.NORMAL_ROUTING_KEY_A, message); } }
消費者的邏輯比較簡單,主要是分別監聽業務隊列和死信隊列,這裏將兩個隊列的消息輸出日誌。這裏模擬的是消費者nack消息,而且不退回隊列的狀況。
MessageListener.java
/** * @description: * @date: 2020/7/12 11:07 下午 * @author: kerry */ @Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; /** * @param message * @param channel * 監聽 業務隊列 * * @throws Exception */ @RabbitListener(queues = RabbitConfig.NORMAL_QUEUE_A) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json對象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("業務隊列-拒絕消息: " + bodyText); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); /** * 延遲隊列,被消費的消息須要重回隊列 */ //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } /** * @param message * @param channel * 監聽 死信隊列 * * @throws Exception */ @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE_A) @RabbitHandler public void onMessageDLX(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json對象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("死信隊列-接收消息: " + bodyText); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
開頭說過,致使消息轉爲死信隊列的方式有3種,下面就從代碼中分析這3種狀況。
咱們回過頭來看看消費者這邊,定義業務隊列的方法:
/** * NORMAL 隊列 * @return */ @Bean public Queue QueueA(){ return QueueBuilder .durable(NORMAL_QUEUE_A) //設置 死信交換機 .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A) .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A) //設置 隊列全部消息 存活時間8秒 .withArgument(X_MESSAGE_TTL,8000) //設置 隊列最大長度 10條 .withArgument(X_MAX_LENGTH,10) .build(); }
1. nack拒絕消息,requeue=false
在死信隊列的架構中,只要在業務隊列中設置了死信交換機x-dead-letter-exchange
。消費者代碼中,咱們在業務隊列的消費過程當中nack消息,而且requeue=false便可。x-dead-letter-routing-key
能夠不設置,若是不設置,默認取消息原來的路由鍵。
代碼以下:
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicNack方法的參數以下:
2. 消息超時TTL
TTL(Time-To-Live)指消息的存活時間,咱們有兩種方式設置消息的TTL:
x-message-ttl
,設置後,進入該隊列的全部消息的TTL都爲對應的值。messageProperties.setExpiration(String.valueOf(3000));
,就是設置消息的TTL爲3秒。不過要注意的是,若是已經設置了nack的死信邏輯,TTL的死信就不生效了。道理也很簡單,由於nack消息和requeue=false一塊兒用,表明消息被消費了,而且消息不會重回隊列,直接被丟棄或進入死信隊列,又怎麼會在隊列中超時了呢。
3. 超過隊列長度
能夠設置隊列長度,例如最大接收消息的數量。當消息在隊列中已經達到最大數量,那麼後面再來的消息,就會被直接丟進死信隊列。
咱們也是中定義業務隊列的代碼中,有經過x-max-length
參數,設置業務隊列的長度。
在我還不知道延遲隊列以前,我就以爲消息中間件應該具有這樣的功能。在消息發佈到隊列後,我指望每一個消息延遲指定時間後,再被消費者獲取到。例如:在支付模塊中,當用戶生成訂單,再到支付完成訂單,是有一段時間的。而咱們通常會給這個訂單設置超時時間,若是超過了這段時間,訂單就應該被取消,沒法再支付。那麼將訂單做爲消息,就能夠利用延遲隊列來實現取消訂單的邏輯。
RabbitMQ並不直接支持延遲隊列的功能,而是做爲一個概念,你能夠利用死信隊列來實現「延遲隊列」。利用TTL超時時間的死信方式,來實現延遲隊列。
回顧一下上段中TTL的方式,咱們在業務隊列中除了定義死信交換機x-dead-letter-exchange
,還能夠定義隊列的生存時間x-message-ttl
,或者設置消息的過時時間。而若是咱們不消費這個業務隊列中的消息,那麼消息在到達TTL後,就會自動轉到死信隊列中。若是咱們只消費死信隊列中的消息,忽略掉業務隊列這個「中轉站」,就至關於消息在被髮布後,通過指定時間延遲,在死信隊列中被消費,這就造成了一個「延遲隊列」。
由於延遲隊列就是死信隊列的一種實現,因此代碼層面上能夠直接參考上段中TTL的部分。
優先隊列,顧名思義,擁有高優先級的隊列具備高的優先權,優先級高的消息具有優先被消費的權力。所以在優先隊列中有兩個邏輯點:隊列是優先隊列
,消息有優先級
。可參考死信隊列章節中,TTL部分的代碼,下面對代碼改動地方作一下說明:
x-max-priority
參數來設置。messageProperties.setPriority(3)
,即設置該消息的優先級爲3。優先隊列的使用場景是在:消息有分優先級的需求,而且併發量較大。要求併發量大,是由於若是全部消息在發佈以後,立刻就被消費了,那麼分優先級的必要性就不大了。