以前介紹了 rabbitmq 的消息發送姿式,既然有發送,固然就得有消費者,在 SpringBoot 環境下,消費能夠說比較簡單了,藉助@RabbitListener
註解,基本上能夠知足你 90%以上的業務開發需求git
下面咱們來看一下@RabbitListener
的最最經常使用使用姿式github
<!-- more -->web
首先建立一個 SpringBoot 項目,用於後續的演示spring
2.2.1.RELEASE
3.7.5
(安裝教程可參考: 【MQ 系列】springboot + rabbitmq 初體驗)依賴配置文件 pom.xml瀏覽器
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 注意,下面這個不是必要的哦--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </pluginManagement> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/libs-snapshot-local</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone-local</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>spring-releases</id> <name>Spring Releases</name> <url>https://repo.spring.io/libs-release-local</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
在application.yml
配置文件中,添加 rabbitmq 的相關屬性springboot
spring: rabbitmq: virtual-host: / username: admin password: admin port: 5672 host: 127.0.0.1
本文將目標放在實用性上,將結合具體的場景來演示@RabbitListener
的使用姿式,所以當你發現看完本文以後這個註解裏面有些屬性仍是不懂,請不要着急,下一篇會一一道來併發
消費消費,沒有數據,怎麼消費呢?因此咱們第一步,先建立一個消息生產者,能夠往 exchange 寫數據,供後續的消費者測試使用app
本篇的消費主要以 topic 模式來進行說明(其餘的幾個模式使用差異不大,若是有需求的話,後續補齊)maven
@RestController public class PublishRest { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(path = "publish") public boolean publish(String exchange, String routing, String data) { rabbitTemplate.convertAndSend(exchange, routing, data); return true; } }
提供一個簡單 rest 接口,能夠指定往哪一個 exchange 推送數據,並制定路由鍵
對於消費者而言實際上是不須要管理 exchange 的建立/銷燬的,它是由發送者定義的;通常來說,消費者更關注的是本身的 queue,包括定義 queue 並與 exchange 綁定,而這一套過程是能夠直接經過 rabbitmq 的控制檯操做的哦
因此實際開發過程當中,exchange 和 queue 以及對應的綁定關係已經存在的可能性是很高的,並不須要再代碼中額外處理;
在這種場景下,消費數據,能夠說很是很是簡單了,以下:
/** * 當隊列已經存在時,直接指定隊列名的方式消費 * * @param data */ @RabbitListener(queues = "topic.a") public void consumerExistsQueue(String data) { System.out.println("consumerExistsQueue: " + data); }
直接指定註解中的queues
參數便可,參數值爲對列名(queueName)
當 queue 的 autoDelete 屬性爲 false 時,上面的使用場景仍是比較合適了;可是,當這個屬性爲 true 時,沒有消費者隊列就會自動刪除了,這個時候再用上面的姿式,可能會獲得下面的異常
一般這種場景下,是須要咱們來主動建立 Queue,並創建與 Exchange 的綁定關係,下面給出@RabbitListener
的推薦使用姿式
/** * 隊列不存在時,須要建立一個隊列,而且與exchange綁定 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "topic.n1", durable = "false", autoDelete = "true"), exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r")) public void consumerNoQueue(String data) { System.out.println("consumerNoQueue: " + data); }
一個註解,內部聲明瞭隊列,並創建綁定關係,就是這麼神奇!!!
注意@QueueBinding
註解的三個屬性:
以上,就是在隊列不存在時的使用姿式,看起來也不復雜
在前面 rabbitmq 的核心知識點學習過程當中,會知道爲了保證數據的一致性,有一個消息確認機制;
咱們這裏的 ack 主要是針對消費端而言,當咱們但願更改默認 ack 方式(noack, auto, manual),能夠以下處理
/** * 須要手動ack,可是不ack時 * * @param data */ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n2", durable = "false", autoDelete = "true"), exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL") public void consumerNoAck(String data) { // 要求手動ack,這裏不ack,會怎樣? System.out.println("consumerNoAck: " + data); }
上面的實現也比較簡單,設置ackMode=MANUAL
,手動 ack
可是,請注意咱們的實現中,沒有任何一個地方體現了手動 ack,這就至關於一致都沒有 ack,在後面的測試中,能夠看出這種不 ack 時,會發現數據一直在unacked
這一欄,當 Unacked 數量超過限制的時候,就不會再消費新的數據了
上面雖然選擇 ack 方式,可是還缺一步 ack 的邏輯,接下來咱們看一下如何補齊
/** * 手動ack * * @param data * @param deliveryTag * @param channel * @throws IOException */ @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"), exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), ackMode = "MANUAL") public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { System.out.println("consumerDoAck: " + data); if (data.contains("success")) { // RabbitMQ的ack機制中,第二個參數返回true,表示須要將這條消息投遞給其餘的消費者從新消費 channel.basicAck(deliveryTag, false); } else { // 第三個參數true,表示這個消息會從新進入隊列 channel.basicNack(deliveryTag, false, true); } }
請注意,方法多了兩個參數
deliveryTag
: 至關於消息的惟一標識,用於 mq 辨別是哪一個消息被 ack/nak 了channel
: mq 和 consumer 之間的管道,經過它來 ack/nak當咱們正確消費時,經過調用 basicAck
方法便可
// RabbitMQ的ack機制中,第二個參數返回true,表示須要將這條消息投遞給其餘的消費者從新消費 channel.basicAck(deliveryTag, false);
當咱們消費失敗,須要將消息從新塞入隊列,等待從新消費時,可使用 basicNack
// 第三個參數true,表示這個消息會從新進入隊列 channel.basicNack(deliveryTag, false, true);
當消息不少,一個消費者吭哧吭哧的消費太慢,可是個人機器性能又槓槓的,這個時候我就但願並行消費,至關於同時有多個消費者來處理數據
要支持並行消費,以下設置便可
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.n4", durable = "false", autoDelete = "true"), exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"), concurrency = "4") public void multiConsumer(String data) { System.out.println("multiConsumer: " + data); }
請注意註解中的concurrency = "4"
屬性,表示固定 4 個消費者;
除了上面這種賦值方式以外,還有一種 m-n
的格式,表示 m 個並行消費者,最多能夠有 n 個
(額外說明:這個參數的解釋實在SimpleMessageListenerContainer
的場景下的,下一篇文章會介紹它與DirectMessageListenerContainer
的區別)
經過前面預留的消息發送接口,咱們在瀏覽器中請求: http://localhost:8080/publish?exchange=topic.e&routing=r&data=wahaha
而後看一下輸出,五個消費者都接收到了,特別是主動 nak 的那個消費者,一直在接收到消息;
(由於一直打印日誌,因此重啓一下應用,開始下一個測試)
而後再發送一條成功的消息,驗證下手動真確 ack,是否還會出現上面的狀況,請求命令: http://localhost:8080/publish?exchange=topic.e&routing=r&data=successMsg
而後再關注一下,沒有 ack 的那個隊列,一直有一個 unack 的消息
系列博文
項目源碼
一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,以上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現 bug 或者有更好的建議,歡迎批評指正,不吝感激
一灰灰 blog