主要介紹消費消息時的幾種方式:前端
在消費消息時,負責消費的應用程序會發送一個Basic.Consumer請求,與該請求一塊兒發送的還有一個no-ack標誌。當這個標誌啓用時,它會告訴RabbitMQ消費者在接收到消息時不會進行確認,RabbitMQ只管儘快的發送消息。java
使用no-ack標誌消費消息是讓RabbitMQ將消費投遞給消費者的最快方式,但這也是最不可靠的方式。
若是使用no-ack,那麼當有新的可用消息時,RabbitMQ將會發送該消息給消費者,而不用等待。實際上,若是有可用消息,RabbitMQ會持續向消費者發送它們,直到套接字緩衝區被填滿爲止。spring
目前沒有找到RabbitTemplate如何開啓no-ack的方法,若是有用過的朋友,請留言告訴我,謝謝。bash
開啓消息確認模式,每次接收到消息後,都要向RabbitMQ返回一個Basic.Ack。服務器
消息確認有三種確認方式:性能
基於RabbitTemplate,下面這段代碼,有對這幾種確認方式的實現,在配置文件中開啓手動確認模式,acknowledge-mode屬性爲manual(默認爲自動確認):測試
spring: #消息隊列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / connection-timeout: 5000ms listener: simple: acknowledge-mode: manual
/** * 消費者監聽消息隊列 */ @Component @Slf4j @RabbitListener(queues = "DIRECT_QUEUE") public class DirectQueueListener { @RabbitHandler public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException { log.info("消費消息成功: {}", message); Thread.sleep(1000); switch (message) { case "nack": channel.basicNack(tag, true, false); // 第二個參數控制是否開啓批量拒絕,第三個參數表示是否requeue break; case "nack-requeue": channel.basicNack(tag, true, true); break; case "reject": channel.basicReject(tag, false); break; case "reject-requeue": // 啓用了requeue,若是隻有一個消費者,容易形成死循環 channel.basicReject(tag, true); break; default: channel.basicAck(tag, true); break; } } }
channel.basicAck:當正常消費消息時,調用該方法。fetch
咱們看到除了basicAck,還有basicReject和basicNack。這兩種,顧名思義,是用來拒絕消費的。ui
channel.basicReject:從協議層面上,reject是發送一個Basic.Reject響應,告知RabbitMQ沒法對這條消息進行處理,當拒絕時,能夠指定是否丟棄消息或使用requeue標誌從新發送消息。當啓用requeue時,RabbitMQ將會把這條消息從新放回到隊列中。spa
不能使用basicReject一次拒絕多個消息。
channel.basicNack:Basic.Nack實現與Basic.Reject相同的行爲,但添加了批量拒絕的功能。
設置multiple或requeue如圖所示:
AMQP規範要求信道要有服務質量設置,即在確認消息接收以前能夠預先接收必定數量的消息。能夠設置一個預取數量來實現高效的發送消息。
若是消費者應用程序在確認消息以前崩潰,在套接字關閉時,全部預取的消息將返回到隊列。
若是設置了no-ack,那麼預取大小將被忽略。
使用RabbitTemplate時,能夠在消費者應用程序的配置文件中配置預取大小:
spring: #消息隊列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / connection-timeout: 5000ms listener: simple: acknowledge-mode: manual prefetch: 1000
其中prefetch就是預取大小,消費者應用程序運行後,能夠在RabbitMQ的控制檯看到這個設置:
若是熟悉抓包軟件的朋友,能夠試着抓包看看:
我預先發送了2條消息到RabbitMQ,能夠看到上圖中最後兩行是兩個Ack。
有一種方式能夠一次確認多個消息,Basic.Ack響應具備一個multiple屬性,當把它設置爲true時就能確認之前未確認的消息。
若是使用multiple,當成功的接收了一些消息,而且應用程序在回覆Ack以前就發生了異常,則全部爲確認的消息將返回隊列。
RabbitMQ的死信交換器是一種能夠拒絕已投遞消息的可選行爲,通常有三種狀況的消息會進入死信隊列:
死信與備用交換器不一樣,過時或被拒絕的消息經過死信交換器進行投遞,而備用交換器則路由那些沒法由RabbitMQ路由的消息。
在RabbitMQ中,在聲明隊列時,指定死信交換器:
/** * 聲明隊列。 * 同時指定死信隊列。 * * @return Queue對象。 */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE") .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
死信交換器還容許使用預先指定的值覆蓋路由鍵,這樣能夠容許同一個交換器同時處理死信和非死信消息,但須要確保死信消息不被投遞到相同的隊列。設置預約義路由鍵的關鍵字是:x-dead-letter-routing-key。
測試死信隊列,當消費者拒絕時,查看消息是否會進入死信隊列:
定義隊列時,有多個設置能夠肯定隊列的行爲:
更改隊列的設置,必須刪除隊列並從新建立它。
也能夠叫作自動刪除的隊列。
一旦消費者完成鏈接和檢索消息,在斷開鏈接時隊列將被刪除。
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").autoDelete() .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
隊列只會在沒有消費者監聽的時候自行刪除。
在須要確保只有單個消費者可以消費隊列中的消息時,在建立隊列時設置exclusive屬性,啓用後在消費者斷開鏈接後,隊列也會自動刪除。
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").exclusive() .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
聲明exclusive的隊列,只能被聲明時所指定的同一個鏈接和信道所消費,當建立隊列的信道關閉時,獨佔隊列也將會刪除。在信道關閉之間,能夠屢次聲明和取消exclusive隊列的消費者。
若是一段時間沒有使用該隊列,就刪除它。
建立一個自動過時的隊列很是簡單,要作的事情就是使用x-expires參數聲明一個隊列。該參數以毫秒爲單位設置隊列的生存時間(Time To Live,TTL)。
自動過時隊列有一些嚴格的約定:
使用durable標誌告訴RabbitMQ但願隊列配置被保存在服務器:
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE") .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
消息級別的TTL設置容許服務器對消息的最大生存時間進行限制。聲明隊列的同時指定死信交換器和TTL值將致使該隊列中已到期的消息成爲死信消息。
可使用x-message-ttl設置隊列的消息TTL時間。
從RabbitMQ3.1.0開始,能夠在聲明隊列時指定最大長度。若是在隊列上設置列x-max-length參數,一旦消息到達最大值,RabbitMQ會在添加新消息時刪除位於隊列前端的消息,若是聲明隊列時候,指定列死信交換器,則從隊列前端刪除的消息會進入死信隊列。