SpringBoot整合RabbitMQ消息隊列-學習筆記

SpringBoot整合RabbitMQ消息隊列-學習筆記

2018年08月30日 14:50:50 Calon Mo 閱讀數 3672html

版權聲明:本文爲博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處連接和本聲明。java

本文連接:http://www.javashuo.com/article/p-pqjrwton-r.htmlgit

前言

本篇文章主要用於記錄我的學習RabbitMQ的過程,記錄下來方便往後查看,若有錯誤的地方,還望指正。github

本篇文章比較詳細地記錄本人在學習過程當中的每個步驟,比較適合對RabbitMQ不熟的同窗學習,跟着本文操做一遍,就能夠大概知道RabbitMQ的基礎知識了。web

準備階段

首先把RabbitMQ環境安裝好,下面再詳細介紹RabbitMQ各個知識點和如何使用。spring

因爲是基於Centos7的操做系統安裝RabbitMQ-3.7.7。json

爲了方便操做,先把防火牆幹掉,生產環境固然不能這麼幹,我的學習隨意,如下是相關命令:centos

centos7關閉並禁止防火牆啓動命令:瀏覽器

 
  1. systemctl stop firewalldspringboot

  2. systemctl disable firewalld

RabbitMQ安裝

這裏介紹一種比較簡單的安裝方法-依賴安裝,不用單獨安裝erlang等依賴。

首先到RabbitMQ官網下載:http://www.rabbitmq.com/download.html,

選擇合適你的操做系統版本,本人的操做系統是Centos7.5,因此選擇RHEL/CentOS 7.x這個。

把下載好的rabbitmq-server-3.7.7-1.el7.noarch.rpm放到/home目錄,因爲RabbitMQ-3.7.7須要安裝比較新的erlang-v19.3以上,而yum上並無這麼高的版本,因此須要在/etc/yum.repos.d/目錄下建立文件rabbitmq-erlang.repo,命令以下:

 
  1. cd /etc/yum.repos.d/

  2. touch rabbitmq-erlang.repo

編輯rabbitmq-erlang.repo命令以下:

vi rabbitmq-erlang.repo

添加如下內容到rabbitmq-erlang.repo:

 
  1. [rabbitmq-erlang]

  2. name=rabbitmq-erlang

  3. baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7/

  4. gpgcheck=1

  5. gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc

  6. repo_gpgcheck=0

  7. enabled=1

上面baseurl是指向erlang-v21版本的連接。

 

cd到/home目錄,執行安裝RabbitMQ的命令:

yum install -y rabbitmq-server-3.7.7-1.el7.noarch.rpm

這個過程會下載安裝依賴的erlang等依賴,等待安裝完成,會出現下面的界面,則說明RabbitMQ就已經安裝完成了。

 
  1. Running transaction check

  2. Running transaction test

  3. Transaction test succeeded

  4. Running transaction

  5. 正在安裝 : erlang-21.0.5-1.el7.centos.x86_64 1/3

  6. 正在安裝 : socat-1.7.3.2-2.el7.x86_64 2/3

  7. 正在安裝 : rabbitmq-server-3.7.7-1.el7.noarch 3/3

  8. 驗證中 : socat-1.7.3.2-2.el7.x86_64 1/3

  9. 驗證中 : rabbitmq-server-3.7.7-1.el7.noarch 2/3

  10. 驗證中 : erlang-21.0.5-1.el7.centos.x86_64 3/3

  11.  
  12. 已安裝:

  13. rabbitmq-server.noarch 0:3.7.7-1.el7

  14.  
  15. 做爲依賴被安裝:

  16. erlang.x86_64 0:21.0.5-1.el7.centos socat.x86_64 0:1.7.3.2-2.el7

  17.  
  18. 完畢!

RabbitMQ設置

啓動RabbitMQ服務:

service rabbitmq-server start

剛安裝好的RabbitMQ是尚未用戶的,也不能訪問RabbitMQ的web管理後臺,接下來先添加一個叫root的用戶:

 
  1. rabbitmqctl add_user root root 

  2. rabbitmqctl set_user_tags root administrator

  3. rabbitmqctl set_permissions -p / root "." "." ".*"

  4.  
  5.  
  6. #更多命令查看:rabbitmqctl --help

啓用web訪問權限:

rabbitmq-plugins enable rabbitmq_management

重啓RabbitMQ服務:

service rabbitmq-server restart

而後在瀏覽器輸入:http://ip:15672/ ,這時能夠看到RabbitMQ管理頁面了,輸入剛剛添加的帳號root,密碼root便可進入。

登陸進去後界面以下:

RabbitMQ是基於Virtual Host來進行權限控制的,如今爲咱們剛剛添加的root用戶添加一個Virtual Host,在RabbitMQ的web管理後臺,根據下圖進行添加一個virtual host,添加成功後默認分配給root用戶了。

 

RabbitMQ簡介

    RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種語言平臺的客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

一般咱們談到消息隊列, 會有三個概念: 消息生產者(Provider)、隊列(Queue)、消息消費者(Consumer),RabbitMQ 在這個基本概念上, 多作了一層抽象, 在消息生產者和隊列之間, 加入了交換器 (Exchange)。這樣消息生產者和隊列就沒有直接聯繫, 變成消息生產者把消息發送給交換器, 交換器根據調度策略再把消息發送給隊列。

  1. 左側P表明消息生產者,也就是往RabbitMQ發消息的程序。
  2. 中間便是RabbitMQ,其中包括交換機(Exchange)和隊列(Queue)。
  3. 右側C表明消費者,也就是往RabbitMQ拿消息的程序。

其中比較重要的概念有:虛擬主機(Virtual Host)、交換機(Exchange)、隊列(Queue)、綁定(Binding)。

虛擬主機(Virtual Hosts)

        在上面已經說明如何爲一個用戶建立一個Virtual Host,一個虛擬主機持有一組交換機、隊列和綁定。在RabbitMQ當中,用戶只能在虛擬主機這個粒度上進行權限的控制。 若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」。

交換機(Exchange)

        交換機的功能主要是接收消息而且根據轉發策略轉發到對應的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤,這個ack模式後面再詳細討論。交換機有四種類型:Direct, topic, Headers and Fanout

隊列(Queue)

        隊列用於存放消息的載體,通常是和交換機進行綁定,交換機根據轉發策略把消息轉發到隊列裏。

綁定(Binding)

        也就是交換機須要和隊列相綁定,這其中如上圖所示,是多對多的關係。

 

交換機類型介紹

    Direct Exchange:

    

        direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個binding_key, 消息的routing_key與binding_key匹配時, 纔會被交換器投送到綁定的隊列中去.

    Topic:

        轉發消息主要是根據通配符。 在這種交換機下,隊列和交換機的綁定會定義一種路由模式,通配符就要在這種路由模式和路由鍵之間匹配後交換機才能轉發消息。

  1. 路由鍵必須是一串字符,用句號(.)隔開,好比說 topic.message,或者 topic.message.detail 等。
  2. 路由模式必須包含一個星號(*),主要用於匹配路由鍵指定位置的一個單詞,好比說,一個路由模式是這樣:topic.*,那麼就只能匹配路由鍵是:topic.message、topic.other等,第一個單詞是 topic,第二個單詞能夠是任意一個單詞。 井號(#)就表示一個或者多個單詞,例如一個匹配模式是topic.#,那麼能夠匹配到例如:topic.message、topic.message.detail等,以topic.開頭的路由鍵均可以匹配到。

    Fanout:

        Fanout類型相似於消息廣播,無論路由鍵或者是路由模式,會把消息發給綁定給它的所有隊列,若是配置了routing_key會被忽略。

    Headers:

        設置header attribute參數類型的交換機

 

項目簡介

    本文是基於Springboot-1.5.15整合RabbitMQ來進行講解,在真實工做中,生產者和消費者通常是在不一樣的項目裏,各自負責不一樣的職責,這裏爲了模擬真實環境,建立兩個不一樣的項目進行演示。建立兩個maven項目,消息生產者mq-rabbit-provider和消息消費者mq-rabbit-consumer,兩個項目的pom.xml文件添加相同依賴:

 
  1. <dependency>

  2. <groupId>org.springframework.boot</groupId>

  3. <artifactId>spring-boot-starter-amqp</artifactId>

  4. </dependency>

  5. <dependency>

  6. <groupId>org.springframework.boot</groupId>

  7. <artifactId>spring-boot-starter-web</artifactId>

  8. </dependency>

mq-rabbit-provider項目的application.properties內容以下:

 
  1. server.port=8080

  2. spring.application.name=springboot-rabbitmq-provider

  3.  
  4. spring.rabbitmq.host=10.211.55.3

  5. spring.rabbitmq.port=5672

  6. spring.rabbitmq.username=root

  7. spring.rabbitmq.password=root

  8. #RabbitMQ的虛擬host

  9. spring.rabbitmq.virtual-host=CalonHost

mq-rabbit-consumer項目的application.properties內容以下:

 
  1. server.port=9090

  2. spring.application.name=springboot-rabbitmq-consumer

  3.  
  4. spring.rabbitmq.host=10.211.55.3

  5. spring.rabbitmq.port=5672

  6. spring.rabbitmq.username=root

  7. spring.rabbitmq.password=root

  8. #RabbitMQ的虛擬host

  9. spring.rabbitmq.virtual-host=CalonHost

這裏只是端口和應用名不一樣,其餘都同樣。

 

接下來分別介紹Direct、Topic、Fanout等3種不一樣交換機的使用例子。

Direct Exchange

    在mq-rabbit-provider項目建一個配置類DirectRabbitConfig.java,配置交換機、隊列、BindingKey=CalonDirectRouting的綁定關係,代碼以下:

 
  1. @Configuration

  2. public class DirectRabbitConfig {

  3.  
  4. //隊列

  5. @Bean

  6. public Queue CalonDirectQueue() {

  7. return new Queue("CalonDirectQueue",true);

  8. }

  9.  
  10. //Direct交換機

  11. @Bean

  12. DirectExchange CalonDirectExchange() {

  13. return new DirectExchange("CalonDirectExchange");

  14. }

  15.  
  16. //綁定

  17. @Bean

  18. Binding bindingDirect() {

  19. return BindingBuilder.bind(CalonDirectQueue()).to(CalonDirectExchange()).with("CalonDirectRouting");

  20. }

  21. }

    建立一個實體類User.java,這裏說明一下,該實體類是消息的主體,因此必須實現Serializable接口,不然在消息消費者項目讀取消息時會報錯,代碼以下:

 
  1. package mq.rabbit.entity;

  2.  
  3. import java.io.Serializable;

  4.  
  5. public class User implements Serializable{

  6.  
  7. private static final long serialVersionUID = 1L;

  8.  
  9. private String id;

  10. private String username;

  11. private String password;

  12. private String type;

  13.  
  14. public String getId() {

  15. return id;

  16. }

  17. public void setId(String id) {

  18. this.id = id;

  19. }

  20. public String getUsername() {

  21. return username;

  22. }

  23. public void setUsername(String username) {

  24. this.username = username;

  25. }

  26. public String getPassword() {

  27. return password;

  28. }

  29. public void setPassword(String password) {

  30. this.password = password;

  31. }

  32.  
  33. public String getType() {

  34. return type;

  35. }

  36. public void setType(String type) {

  37. this.type = type;

  38. }

  39. public User() {

  40. super();

  41. }

  42. public User(String id, String username, String password, String type) {

  43. super();

  44. this.id = id;

  45. this.username = username;

  46. this.password = password;

  47. this.type = type;

  48. }

  49. }

下面建立一個Controller,利用http請求進行調試,CalonDirectExchange是上面配置的交換機標識,CalonDirectRouting就是上面綁定好的queue名字,因爲上面已經配置好交換機和隊列的綁定關係,這兩個組合就能夠知道消息最終是發送到隊列CalonDirectQueue裏面去了,Controller類的代碼以下:

 
  1. @Controller

  2. public class SendController {

  3.  
  4. @Autowired

  5. private RabbitTemplate template;

  6.  
  7. @GetMapping("/sendDirect")

  8. private @ResponseBody String sendDirect(String message) throws Exception {

  9. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendDirect");

  10. template.convertAndSend("CalonDirectExchange", "CalonDirectRouting", user);

  11. return "OK,sendDirect:" + message;

  12. }

  13. }

啓動mq-rabbit-provider項目,在瀏覽器輸入:

http://localhost:8080/sendDirect?message=123

再去RabbitMQ的web管理後臺查看,你會發如今Queue裏找到剛剛添加的那個隊列,後面的數字就是消息數量有變化,說明消息已經存儲進去了:

    把mq-rabbit-provider項目裏的User類和DirectRabbitConfig類複製到mq-rabbit-consumer項目,User類用於讀取消息時接收消息對象,DirectRabbitConfig能夠不復制,可是若是RabbitMQ裏尚未被監聽的隊列時會報錯,複製過來是爲了讓RabbitMQ裏尚未被監聽的隊列時自動建立該隊列,防止報錯。

建立隊列監聽類DirectReceiver.java,代碼以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6.  
  7. import mq.rabbit.entity.User;

  8.  
  9. @Component

  10. @RabbitListener(queues = "CalonDirectQueue")//CalonDirectQueue爲隊列名稱

  11. public class DirectReceiver {

  12.  
  13. @RabbitHandler

  14. public void process(User user) {

  15. System.out.println("DirectReceiver消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  16. }

  17.  
  18. }

啓動mq-rabbit-consumer項目,就會收到以前發送到CalonDirectQueue隊列的消息了,繼續調用上面的請求/sendDirect,消息消費者會繼續收到消息。

 

Topic Exchange

在mq-rabbit-provider項目建一個配置類TopicRabbitConfig.java,配置交換機、隊列、BindingKey的綁定關係,代碼以下:

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.Binding;

  4. import org.springframework.amqp.core.BindingBuilder;

  5. import org.springframework.amqp.core.Queue;

  6. import org.springframework.amqp.core.TopicExchange;

  7. import org.springframework.context.annotation.Bean;

  8. import org.springframework.context.annotation.Configuration;

  9.  
  10. /**

  11. * Topic Exchange類型交換機

  12. * @author calon

  13. *

  14. */

  15. @Configuration

  16. public class TopicRabbitConfig {

  17.  
  18. public final static String first = "topic.first";

  19. public final static String second = "topic.second";

  20.  
  21. @Bean

  22. public Queue firstQueue() {

  23. return new Queue(TopicRabbitConfig.first);

  24. }

  25.  
  26. @Bean

  27. public Queue secondQueue() {

  28. return new Queue(TopicRabbitConfig.second);

  29. }

  30.  
  31. @Bean

  32. TopicExchange exchange() {

  33. return new TopicExchange("topicExchange");

  34. }

  35.  
  36. //綁定topic.first隊列到routingKey爲topic.first,只有topic.first的routingKey消息才發送到此隊列

  37. @Bean

  38. Binding bindingExchangeMessage() {

  39. return BindingBuilder.bind(firstQueue()).to(exchange()).with(first);

  40. }

  41.  
  42. //綁定topic.second隊列到topic.#,凡是topic.開頭的routingKey消息都發送到此隊列

  43. @Bean

  44. Binding bindingExchangeMessage2() {

  45. return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");

  46. }

  47.  
  48. }

Topic Exchange類型的交換機是基於模糊匹配規則,因此這裏建立兩個Queue,分別綁定到兩個BindingKey:topic.first和topic.#,用來測試消息進到哪一個隊列裏。

在SendController類裏添加兩個request,代碼以下:

 
  1. @Controller

  2. public class SendController {

  3.  
  4. @Autowired

  5. private RabbitTemplate template;

  6.  
  7. @GetMapping("/sendTopicFirst")

  8. private @ResponseBody String sendTopicFirst(String message) {

  9. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendTopicFirst");

  10. template.convertAndSend("topicExchange", "topic.first", user);

  11. return "OK,sendTopicFirst:" + message;

  12. }

  13.  
  14. @GetMapping("/sendTopicSecond")

  15. private @ResponseBody String sendTopicSecond(String message) {

  16. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendTopicSecond");

  17. template.convertAndSend("topicExchange", "topic.second", user);

  18. return "OK,sendTopicSecond:" + message;

  19. }

  20. }

當咱們調用/sendTopicFirst請求時,交換機爲topicExchange,routingKey爲topic.first,按照上面bindingKey的配置,能夠匹配到topic.first和topic.#規則,對應的隊列是topic.first和topic.second,因此一條消息進到兩個隊列裏。

當調用/sendTopicSecond請求時,交換機爲topicExchange,routingKey爲topic.second,匹配到topic.#規則,對應的隊列是topic.second,因此消息進到topic.second隊列裏,除了#匹配規則,你們能夠自行試試星號(*)這個匹配規則,*符號是匹配一個單詞的。

把mq-rabbit-provider項目裏的TopicRabbitConfig類複製到mq-rabbit-consumer項目,分別建立TopicFirstReceiver和TopicSecondReceiver消息監聽類,代碼以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "topic.first")

  10. public class TopicFirstReceiver {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("TopicFirstReceiver消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16. }

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "topic.second")

  10. public class TopicSecondReceiver {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("TopicSecondReceiver消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

啓動mq-rabbit-consumer項目,會發現分別接收到各自監聽的隊列的消息。

 

Fanout Exchang

    在mq-rabbit-provider項目建一個配置類FanoutRabbitConfig.java,配置交換機、隊列的綁定關係,代碼以下:    

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.Binding;

  4. import org.springframework.amqp.core.BindingBuilder;

  5. import org.springframework.amqp.core.FanoutExchange;

  6. import org.springframework.amqp.core.Queue;

  7. import org.springframework.context.annotation.Bean;

  8. import org.springframework.context.annotation.Configuration;

  9.  
  10. @Configuration

  11. public class FanoutRabbitConfig {

  12.  
  13. @Bean

  14. public Queue AMessage() {

  15. return new Queue("fanout.A");

  16. }

  17.  
  18. @Bean

  19. public Queue BMessage() {

  20. return new Queue("fanout.B");

  21. }

  22.  
  23. @Bean

  24. public Queue CMessage() {

  25. return new Queue("fanout.C");

  26. }

  27.  
  28. @Bean

  29. FanoutExchange fanoutExchange() {

  30. return new FanoutExchange("fanoutExchange");

  31. }

  32.  
  33. @Bean

  34. Binding bindingExchangeA() {

  35. return BindingBuilder.bind(AMessage()).to(fanoutExchange());

  36. }

  37.  
  38. @Bean

  39. Binding bindingExchangeB() {

  40. return BindingBuilder.bind(BMessage()).to(fanoutExchange());

  41. }

  42.  
  43. @Bean

  44. Binding bindingExchangeC() {

  45. return BindingBuilder.bind(CMessage()).to(fanoutExchange());

  46. }

  47. }

這裏建立三個隊列fanout.A、fanout.B、fanout.C,都綁定到FanoutExchange交換機fanoutExchange上。

在SendController類添加一個請求/sendFanout,代碼以下:

 
  1. package mq.rabbit.controller;

  2.  
  3. import java.util.UUID;

  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.stereotype.Controller;

  7. import org.springframework.web.bind.annotation.GetMapping;

  8. import org.springframework.web.bind.annotation.PathVariable;

  9. import org.springframework.web.bind.annotation.ResponseBody;

  10. import com.fasterxml.jackson.databind.ObjectMapper;

  11. import mq.rabbit.entity.User;

  12.  
  13. @Controller

  14. public class SendController {

  15.  
  16. @Autowired

  17. private RabbitTemplate template;

  18.  
  19. @GetMapping("/sendFanout")

  20. private @ResponseBody String sendFanout(String message) {

  21. User user = new User(UUID.randomUUID().toString(), message, "123456", "sendFanout");

  22. template.convertAndSend("fanoutExchange", null, user);

  23. return "OK,sendFanout:" + message;

  24. }

  25.  
  26. }

當調用/sendFanout請求時,在RabbitMQ的web管理界面看到三個隊列fanout.A、fanout.B、fanout.C都有一條消息,在Fanout交換機裏,若是有設置BindingKey,Fanout交換機會忽略已設置的BindingKey,把消息發送到綁定該交換機的全部隊列裏。

 

把mq-rabbit-provider項目裏的FanoutRabbitConfig類複製到mq-rabbit-consumer項目,分別建立FanoutReceiverA、FanoutReceiverB和FanoutReceiverC類,代碼以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "fanout.A")

  10. public class FanoutReceiverA {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("FanoutReceiverA消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "fanout.B")

  10. public class FanoutReceiverB {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("FanoutReceiverB消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;

  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;

  5. import org.springframework.stereotype.Component;

  6. import mq.rabbit.entity.User;

  7.  
  8. @Component

  9. @RabbitListener(queues = "fanout.C")

  10. public class FanoutReceiverC {

  11.  
  12. @RabbitHandler

  13. public void process(User user) {

  14. System.out.println("FanoutReceiverC消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  15. }

  16.  
  17. }

上面也能夠在一個類裏寫3個方法來進行對隊列的監聽,不一樣的地方在於把@RabbitListener移到方法上便可。

啓動mq-rabbit-consumer,便可收到隊列的消息。

 

RabbitMQ消息的確認機制

    在使用RabbitMQ的時候,咱們能夠經過消息持久化操做來解決由於服務器的異常奔潰致使的消息丟失,除此以外咱們還會遇到一個問題,當消息的生產者在將消息發送出去以後,消息到底有沒有正確到達服務器?若是不進行特殊配置的話,默認狀況下發布消息是不會返回任何信息給生產者的,也就是生產者是不知道消息有沒有正確到達消息服務器,同理,消息消費者在接收消息後,若是在執行業務邏輯過程出現異常崩潰等狀況,會致使消息丟失,因此咱們須要對消息的發送和消費進行確認,確保消息可以被正確的存儲和消費。RabbitMQ爲咱們提供了兩種方式:一、事務機制;二、確認機制。下面介紹消息確認機制。

 

生產者消息確認機制:

先把例子跑起來,下面再作詳細介紹。在mq-rabbit-provider項目的application.properties文件添加如下屬性:

 
  1. #確認消息已發送到交換機(Exchange)

  2. spring.rabbitmq.publisher-confirms=true

  3. #確認消息已發送到隊列(Queue)

  4. spring.rabbitmq.publisher-returns=true

在mq-rabbit-provider項目建立配置類RabbitConfig.java,代碼以下:

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.Message;

  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;

  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;

  6. import org.springframework.amqp.rabbit.support.CorrelationData;

  7. import org.springframework.context.annotation.Bean;

  8. import org.springframework.context.annotation.Configuration;

  9.  
  10. @Configuration

  11. public class RabbitConfig {

  12.  
  13. @Bean

  14. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){

  15. RabbitTemplate rabbitTemplate = new RabbitTemplate();

  16. rabbitTemplate.setConnectionFactory(connectionFactory);

  17. rabbitTemplate.setMandatory(true);//必須設置爲true,才能讓下面的ReturnCallback函數生效

  18.  
  19. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

  20. @Override

  21. public void confirm(CorrelationData correlationData, boolean ack, String cause) {

  22. System.out.println("=======ConfirmCallback=========");

  23. System.out.println("correlationData = " + correlationData);

  24. System.out.println("ack = " + ack);

  25. System.out.println("cause = " + cause);

  26. System.out.println("=======ConfirmCallback=========");

  27. }

  28. });

  29.  
  30. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

  31. @Override

  32. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

  33. System.out.println("--------------ReturnCallback----------------");

  34. System.out.println("message = " + message);

  35. System.out.println("replyCode = " + replyCode);

  36. System.out.println("replyText = " + replyText);

  37. System.out.println("exchange = " + exchange);

  38. System.out.println("routingKey = " + routingKey);

  39. System.out.println("--------------ReturnCallback----------------");

  40. }

  41. });

  42.  
  43. return rabbitTemplate;

  44. }

  45.  
  46. }

RabbitMQ生產者是依賴兩個回調函數來實現確認的,分別是ConfirmCallback和ConfirmCallback,如上面的代碼。按如下4種狀況進行回調:

一、消息找不到交換機(Exchange)時回調ConfirmCallback,返回ack=false,代碼以下:

 
  1. =======ConfirmCallback=========

  2. correlationData = null

  3. ack = false

  4. cause = channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

  5. =======ConfirmCallback=========

  6. 2018-08-30 09:59:37.892 ERROR 55704 --- [0.211.55.3:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

二、消息找到交換機(Exchange)但找不到隊列(Queue)時回調ConfirmCallback和ReturnCallback,返回ack=true,replyCode = 312,replyText = NO_ROUTE,代碼以下:

 
  1. --------------ReturnCallback----------------

  2. message = (Body:'[B@bf8af5b(byte[179])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/x-java-serialized-object, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=null, receivedExchange=null, receivedRoutingKey=null, receivedDelay=null, deliveryTag=0, messageCount=null, consumerTag=null, consumerQueue=null])

  3. replyCode = 312

  4. replyText = NO_ROUTE

  5. exchange = CalonDirectExchange

  6. routingKey = CalonDirectRouting1

  7. --------------ReturnCallback----------------

  8. =======ConfirmCallback=========

  9. correlationData = null

  10. ack = true

  11. cause = null

  12. =======ConfirmCallback=========

三、消息既找不到交換機(Exchange)又找不到隊列(Queue)時回調ConfirmCallback,返回ack=false,代碼以下:

 
  1. =======ConfirmCallback=========

  2. correlationData = null

  3. ack = false

  4. cause = channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

  5. =======ConfirmCallback=========

  6. 2018-08-30 10:03:22.204 ERROR 55704 --- [0.211.55.3:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CalonDirectExchange1' in vhost 'calonHost', class-id=60, method-id=40)

四、消息成功發送回調ConfirmCallback,返回ack=true,代碼以下:

 
  1. =======ConfirmCallback=========

  2. correlationData = null

  3. ack = true

  4. cause = null

  5. =======ConfirmCallback=========

根據上面4種狀態,咱們能夠在這兩個回調函數里根據返回的狀態進行業務方面的處理,好比業務回滾或者從新發送消息等,能夠基於上面SendController類對其中一個請求進行測試,更改exchange和routingKey來測試一下這4種狀態,這個就是生產消息的確認機制。

 

消費者消息確認機制:

    在mq-rabbit-consumer項目的DirectRabbitConfig配置類進行消息消費確認機制的配置,代碼以下:

 
  1. package mq.rabbit.config;

  2.  
  3. import org.springframework.amqp.core.AcknowledgeMode;

  4. import org.springframework.amqp.core.Binding;

  5. import org.springframework.amqp.core.BindingBuilder;

  6. import org.springframework.amqp.core.DirectExchange;

  7. import org.springframework.amqp.core.Queue;

  8. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

  9. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

  10. import org.springframework.beans.factory.annotation.Autowired;

  11. import org.springframework.context.annotation.Bean;

  12. import org.springframework.context.annotation.Configuration;

  13. import mq.rabbit.receiver.DirectAckReceiver;

  14.  
  15. @Configuration

  16. public class DirectRabbitConfig {

  17.  
  18. @Bean

  19. public Queue CalonDirectQueue() {

  20. return new Queue("CalonDirectQueue",true);

  21. }

  22.  
  23. @Bean

  24. DirectExchange CalonDirectExchange() {

  25. return new DirectExchange("CalonDirectExchange");

  26. }

  27.  
  28. @Bean

  29. Binding bindingDirect() {

  30. return BindingBuilder.bind(CalonDirectQueue()).to(CalonDirectExchange()).with("CalonDirectRouting");

  31. }

  32.  
  33. @Autowired

  34. private CachingConnectionFactory connectionFactory;

  35. @Autowired

  36. private DirectAckReceiver directAckReceiver;//消息接收處理類

  37.  
  38. @Bean

  39. public SimpleMessageListenerContainer simpleMessageListenerContainer() {

  40. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

  41. container.setConcurrentConsumers(1);

  42. container.setMaxConcurrentConsumers(1);

  43. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這裏改成手動確認消息

  44.  
  45. container.setQueues(CalonDirectQueue());

  46. container.setMessageListener(directAckReceiver);

  47. return container;

  48. }

  49.  
  50. }

在mq-rabbit-consumer項目新建消息監聽類DirectAckReceiver.java,用於處理消息的確認操做,代碼以下:

 
  1. package mq.rabbit.receiver;

  2.  
  3. import org.springframework.amqp.core.Message;

  4. import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.stereotype.Component;

  7. import com.fasterxml.jackson.databind.ObjectMapper;

  8. import com.rabbitmq.client.Channel;

  9. import mq.rabbit.entity.User;

  10.  
  11. @Component

  12. public class DirectAckReceiver implements ChannelAwareMessageListener {

  13.  
  14. @Autowired

  15. private ObjectMapper objectMapper;

  16.  
  17. @Override

  18. public void onMessage(Message message, Channel channel) throws Exception {

  19. long deliveryTag = message.getMessageProperties().getDeliveryTag();

  20. try {

  21. byte[] body = message.getBody();

  22. User user = objectMapper.readValue(body, User.class);

  23. System.out.println("DirectAckReceiver消費者收到消息 : " + user.getId()+","+user.getUsername()+","+user.getPassword()+","+user.getType());

  24. channel.basicAck(deliveryTag, true);

  25. // channel.basicReject(deliveryTag, true);//爲true會從新放回隊列

  26. } catch (Exception e) {

  27. channel.basicReject(deliveryTag, false);

  28. e.printStackTrace();

  29. }

  30. }

  31.  
  32. }

消息接收確認模式有如下3種:

  • AcknowledgeMode.NONE:不確認
  • AcknowledgeMode.AUTO:自動確認
  • AcknowledgeMode.MANUAL:手動確認

默認狀況下是自動確認,若是消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那麼就至關於丟失了消息,在實際應用中,咱們但願每條消息都可以被正確消費而不是出現丟失的狀況,上面代碼是開啓手動確認模式,下面看看手動確認都有哪幾種方式:

  • 成功確認:void basicAck(long deliveryTag, boolean multiple) throws IOException;

            deliveryTag:該消息的index

            multiple:是否批量. true:將一次性ack全部小於deliveryTag的消息。

        消費者成功處理後,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。

  • 失敗確認:void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

            deliveryTag:該消息的index。

            multiple:是否批量. true:將一次性拒絕全部小於deliveryTag的消息。

            requeue:是否從新入隊列。

  • 拒絕確認:void basicReject(long deliveryTag, boolean requeue) throws IOException;

            deliveryTag:該消息的index。

            requeue:被拒絕的是否從新入隊列。

            channel.basicNack 與 channel.basicReject 的區別在於basicNack能夠批量拒絕多條消息,而basicReject一次只能拒絕一條消息。

這裏要注意一點的是,不管如何,必須對消息進行確認操做,若是不調用相關函數進行確認,則RabbitMQ會認爲該程序處理能力弱,不會再發送消息到該監聽程序。

還有一個問題,在啓用消息手動確認模式後,發送消息的實體須要轉成json字符串發送,接收消息時再把json轉回對象,不然出錯,也許是我還沒找到直接發送實體的方法,還望指正。        

    RabbitMQ的基礎知識就已經介紹完了,若有錯誤,還望留意指正,謝謝。

 

本文例子代碼地址:

https://github.com/calonmo/mq-rabbit-provider.git

https://github.com/calonmo/mq-rabbit-consumer.git

相關文章
相關標籤/搜索