消息中間件 -- RabbitMQ

1、介紹

  RabbitMQ是由Erlang語言開發,基於AMQP(高級消息隊列協議)協議實現的消息隊列。spring

  補充:JMS,Java提供的一套消息服務API標準。服務器

  應用場景:併發

  • 任務異步處理。

  將不須要同步處理的而且耗時長的操做由消息隊列通知消息接收方進行異步處理,提升了應用程序的響應時間。app

  • 應用程序解耦。

  MQ至關於一箇中介,生產方經過MQ與消費方交互,它將應用程序進行解耦合。異步

  • 優勢

  一、使用簡單,功能強大maven

  二、基於AMQP協議分佈式

  三、社區活躍,文檔完善ide

  四、高併發性能好,得益於Erlang語言高併發

  五、SpringBoot默認集成RabbitMQ工具

2、快速入門

  • RabbitMQ的工做原理

  基本結構以下:

  

  組成部分說明:

  • Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue
  • Exchange:消息隊列交換機,按必定的規則將消息路由轉發到某個隊列,對消息進行過濾
  • Queue:消息隊列,存儲消息隊列,消息到達隊列轉發到指定的消費方
  • Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ
  • Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息

  消息發佈接收流程:

  — — — — — —發送消息— — — — — —

  1. 生產者和Broker創建TCP鏈接
  2. 生產者和Broker創建通道
  3. 生產者經過通道將消息發送給Broker,由Exchange將消息進行轉發
  4. Exchange將消息轉發到指定的Queue(隊列)

  — — — — — —接收消息— — — — — —

  1. 消費者和Broker創建TCP鏈接
  2. 消費者和Broker創建通道
  3. 消費者監聽指定的Queue(隊列)
  4. 當有消息到達Queue時Broker默認將消息推送給消費者
  5. 消費者接收到消息
  • 關於安裝

  RabbitMQ有Erlang語言開發,Erlang語言用於併發及分佈式系統的開發,OTP(Open Telecon Platform)做爲Erlang語言的一部分,包含了不少基於Erlang開發的中間件及工具庫,安裝RabbitMQ須要安裝Erlang/OTP,並保持版本匹配。

  咱們後面的案例採用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

  具體的安裝過程這裏就不贅述,網上搜索教程便可。

3、入門案例

  咱們首先用RabbitMQ官方提供的Java client測試,瞭解一下RabbitMQ的交互過程。

  • 建立maven工程

  

  • 在父工程rabbitmq-test的pom.xml文件中添加依賴
1 <dependency>
2     <groupId>com.rabbitmq</groupId>
3     <artifactId>amqp-client</artifactId>
4     <version>5.1.2</version>
5 </dependency>
  • 在生產者工程中添加生產者類Producer01
 1 public class Producer01 {  2     /**
 3  * 隊列名稱  4      */
 5     private static final String QUEUE = "hello rabbitmq";  6 
 7     public static void main(String[] args) {  8         Connection connection = null;  9         Channel channel = null; 10 
11         ConnectionFactory factory = new ConnectionFactory(); 12         factory.setHost("localhost"); 13         factory.setPort(5672); 14         factory.setUsername("guest"); 15         factory.setPassword("guest"); 16         // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器
17         factory.setVirtualHost("/"); 18 
19         try { 20             // 建立於RabbitMQ服務的鏈接
21             connection = factory.newConnection(); 22             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
23             channel = connection.createChannel(); 24             /**
25  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 26  * param1:隊列名稱 27  * param2:是否持久化 28  * param3:隊列是否獨佔此鏈接 29  * param4:隊列再也不使用時自動刪除此列 30  * param5:隊列參數 31              */
32             channel.queueDeclare(QUEUE,true,false,false,null); 33             String message = "hello rabbit:" + System.currentTimeMillis(); 34 
35             /**
36  * 消息發佈方法 37  * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 38  * 這裏沒有指定交換機,消息將發送給默認交換機,每一個隊列也會綁定默認交換機,可是不能顯示綁定或解除綁定 39  * 使用默認的交換機,routingkey等於隊列名稱 40  * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 41  * param3:消息包含的屬性 42  * param4:消息體 43              */
44             channel.basicPublish("",QUEUE,null,message.getBytes()); 45         } catch (IOException e) { 46  e.printStackTrace(); 47         } catch (TimeoutException e) { 48  e.printStackTrace(); 49         } finally { 50             if (channel != null){ 51                 try { 52  channel.close(); 53                 } catch (IOException e) { 54  e.printStackTrace(); 55                 } catch (TimeoutException e) { 56  e.printStackTrace(); 57  } 58  } 59             if (connection != null){ 60                 try { 61  connection.close(); 62                 } catch (IOException e) { 63  e.printStackTrace(); 64  } 65  } 66  } 67  } 68 }
Producer01
  • 在消費者工程中添加消費者類Consumer01
 1 public class Consumer01 {  2     /**
 3  * 隊列名稱  4      */
 5     private static final String QUEUE = "hello rabbitmq";  6 
 7     public static void main(String[] args) {  8         Connection connection = null;  9         Channel channel = null; 10 
11         ConnectionFactory factory = new ConnectionFactory(); 12         factory.setHost("localhost"); 13         factory.setPort(5672); 14 
15         try { 16             // 建立於RabbitMQ服務的鏈接
17             connection = factory.newConnection(); 18             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
19             channel = connection.createChannel(); 20             /**
21  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 22  * param1:隊列名稱 23  * param2:是否持久化 24  * param3:隊列是否獨佔此鏈接 25  * param4:隊列再也不使用時自動刪除此列 26  * param5:隊列參數 27              */
28             channel.queueDeclare(QUEUE,true,false,false,null); 29             // 定義消費方法
30             DefaultConsumer consumer = new DefaultConsumer(channel){ 31 
32                 /**
33  * 消費者接受消息調用此方法 34  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 35  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 36  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 37  * @param properties 38  * @param body 39  * @throws IOException 40                  */
41  @Override 42                 public void handleDelivery(String consumerTag, 43  Envelope envelope, 44  AMQP.BasicProperties properties, 45                                            byte[] body) throws IOException { 46                     // 交換機
47                     String exchange = envelope.getExchange(); 48                     // routingkey
49                     String routingKey = envelope.getRoutingKey(); 50                     // 消息id
51                     long deliveryTag = envelope.getDeliveryTag(); 52                     // 消息內容
53                     String msg = new String(body, "utf-8"); 54                     System.out.println("receive message:" + msg); 55  } 56  }; 57             /**
58  * 監聽隊列String queue, boolean autoAck, Consumer callback 59  * param1:隊列名稱 60  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 61  * 設置爲false則須要手動回覆 62  * param3:消費消息的方法,消費者接收到消息後調用此方法 63              */
64             channel.basicConsume(QUEUE, true, consumer); 65         } catch (IOException e) { 66  e.printStackTrace(); 67         } catch (TimeoutException e) { 68  e.printStackTrace(); 69  } 70  } 71 }
Consumer01

  啓動消費者工程中的main方法進行監聽,再啓動生產者中的main方法。注:二者的啓動順序能夠顛倒。控制檯打印到接收到的消息以下:

  • 總結

  發送端操做流程:

  建立鏈接——>建立通道——>聲明隊列——>發送消息

  接收端:

  建立鏈接——>建立通道——>聲明隊列——>監聽隊列——>接收消息——>ack回覆

4、工做模式

  RabbitMQ有如下幾種工做模式:

  • Work queues模式
    •   
    • Work queues與入門程序比,多以消費端,兩個消費端同時消費同一個隊列中的消息

    • 消費場景:對於人物太重或者任務較多狀況使用工做隊列能夠提升任務處理的速度
    • 測試:
      • 使用入門程序,啓動多個消費者
      • 生產者發送多個消息
    • 結果:
      • 一條消息只會被一個消費者接收
      • rabbit採用輪詢的方式將消息是平均發送給消費者
      • 消費者在處理完某條消息後,纔會接收到下一條消息
  • Publish/Subscribe模式
    • 發佈訂閱模式

      • 每一個消費者監聽本身的隊列
      • 生產者將消息發送給broker,由交換機將消息轉發到綁定次交換機的每一個隊列,每一個綁定交換機的隊列都將接收到消息
    • 案例:用戶通知,信用卡月帳單,通知方式有短信,郵件等多種方式
    • 生產者
      • 聲明Exchange_fanout_inform交換機
      • 聲明兩個隊列而且綁定到此交換機,綁定時不須要指定routingkey
      • 在生產者工程中添加生產者類Producer02Publish
 1 public class Producer02Publish {  2     /**
 3  * 隊列名稱  4      */
 5     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 聲明EXCHANGE_FANOUT_INFORM交換機  9      */
10     private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 11 
12     public static void main(String[] args) { 13         Connection connection = null; 14         Channel channel = null; 15 
16         // 建立一個與MQ的鏈接
17         ConnectionFactory factory = new ConnectionFactory(); 18         factory.setHost("localhost"); 19         factory.setPort(5672); 20         factory.setUsername("guest"); 21         factory.setPassword("guest"); 22         // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器
23         factory.setVirtualHost("/"); 24 
25         try { 26             // 建立於RabbitMQ服務的鏈接
27             connection = factory.newConnection(); 28             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
29             channel = connection.createChannel(); 30             /**
31  * 聲明交換機 32  * param1:交換機名稱 33  * param2:交換機類型,fanout、topic、direct、headers 34              */
35  channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 36             /**
37  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 38  * param1:隊列名稱 39  * param2:是否持久化 40  * param3:隊列是否獨佔此鏈接 41  * param4:隊列再也不使用時自動刪除此列 42  * param5:隊列參數 43              */
44             channel.queueDeclare(QUEUE_EMAIL_INFORM,true,false,false,null); 45             channel.queueDeclare(QUEUE_SMS_INFORM,true,false,false,null); 46             /**
47  * 交換機和隊列綁定 48  * param1:隊列名稱 49  * param2:交換機名稱 50  * param3:路由key 51              */
52             channel.queueBind(QUEUE_EMAIL_INFORM,EXCHANGE_FANOUT_INFORM,""); 53             channel.queueBind(QUEUE_SMS_INFORM,EXCHANGE_FANOUT_INFORM,""); 54 
55             // 發送消息
56             for (int i = 0; i<10; i++){ 57                 String message = "inform to user:" + i; 58                 /**
59  * 消息發佈方法 60  * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 61  * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 62  * param3:消息包含的屬性 63  * param4:消息體 64                  */
65                 channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes()); 66 
67  } 68         } catch (IOException e) { 69  e.printStackTrace(); 70         } catch (TimeoutException e) { 71  e.printStackTrace(); 72         } finally { 73             if (channel != null){ 74                 try { 75  channel.close(); 76                 } catch (IOException e) { 77  e.printStackTrace(); 78                 } catch (TimeoutException e) { 79  e.printStackTrace(); 80  } 81  } 82             if (connection != null){ 83                 try { 84  connection.close(); 85                 } catch (IOException e) { 86  e.printStackTrace(); 87  } 88  } 89  } 90  } 91 }
Producer02Publish
    • 接收郵件消費者
 1 public class Consumer02SubscribeEmail {  2     /**
 3  * 隊列名稱  4      */
 5     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  6     /**
 7  * 聲明EXCHANGE_FANOUT_INFORM交換機  8      */
 9     private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 10 
11     public static void main(String[] args) { 12         Connection connection = null; 13         Channel channel = null; 14 
15         // 建立一個與MQ的鏈接
16         ConnectionFactory factory = new ConnectionFactory(); 17         factory.setHost("localhost"); 18         factory.setPort(5672); 19         factory.setUsername("guest"); 20         factory.setPassword("guest"); 21         // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器
22         factory.setVirtualHost("/"); 23         try { 24             // 建立於RabbitMQ服務的鏈接
25             connection = factory.newConnection(); 26             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
27             channel = connection.createChannel(); 28             /**
29  * param1:交換機名稱 30  * param2:交換機類型,fanout、topic、direct、headers 31              */
32  channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 33             /**
34  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 35  * param1:隊列名稱 36  * param2:是否持久化 37  * param3:隊列是否獨佔此鏈接 38  * param4:隊列再也不使用時自動刪除此列 39  * param5:隊列參數 40              */
41             channel.queueDeclare(QUEUE_EMAIL_INFORM,true,false,false,null); 42 
43             /**
44  * 交換機和隊列綁定 45  * param1:隊列名稱 46  * param2:交換機名稱 47  * param3:路由key 48              */
49             channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_FANOUT_INFORM, ""); 50             // 定義消費方法
51             DefaultConsumer consumer = new DefaultConsumer(channel){ 52 
53                 /**
54  * 消費者接受消息調用此方法 55  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 56  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 57  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 58  * @param properties 59  * @param body 60  * @throws IOException 61                  */
62  @Override 63                 public void handleDelivery(String consumerTag, 64  Envelope envelope, 65  AMQP.BasicProperties properties, 66                                            byte[] body) throws IOException { 67                     // 消息id
68                     long deliveryTag = envelope.getDeliveryTag(); 69                     // 交換機
70                     String exchange = envelope.getExchange(); 71                     // 消息內容
72                     String msg = new String(body, "utf-8"); 73                     System.out.println("receive message:" + msg); 74  } 75  }; 76             /**
77  * 監聽隊列String queue, boolean autoAck, Consumer callback 78  * param1:隊列名稱 79  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 80  * 設置爲false則須要手動回覆 81  * param3:消費消息的方法,消費者接收到消息後調用此方法 82              */
83             channel.basicConsume(QUEUE_EMAIL_INFORM, true, consumer); 84         } catch (IOException e) { 85  e.printStackTrace(); 86         } catch (TimeoutException e) { 87  e.printStackTrace(); 88  } 89  } 90 }
Consumer02SubscribeEmail
    • 接收信息消費者
 1 public class Consumer02SubscribeSms {  2     /**
 3  * 隊列名稱  4      */
 5     private static final String QUEUE_SMS_INFORM = "queue sms inform";  6     /**
 7  * 聲明EXCHANGE_FANOUT_INFORM交換機  8      */
 9     private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 10 
11     public static void main(String[] args) { 12         Connection connection = null; 13         Channel channel = null; 14 
15         // 建立一個與MQ的鏈接
16         ConnectionFactory factory = new ConnectionFactory(); 17         factory.setHost("localhost"); 18         factory.setPort(5672); 19         factory.setUsername("guest"); 20         factory.setPassword("guest"); 21         // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器
22         factory.setVirtualHost("/"); 23         try { 24             // 建立於RabbitMQ服務的鏈接
25             connection = factory.newConnection(); 26             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
27             channel = connection.createChannel(); 28             /**
29  * param1:交換機名稱 30  * param2:交換機類型,fanout、topic、direct、headers 31              */
32  channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 33             /**
34  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 35  * param1:隊列名稱 36  * param2:是否持久化 37  * param3:隊列是否獨佔此鏈接 38  * param4:隊列再也不使用時自動刪除此列 39  * param5:隊列參數 40              */
41             channel.queueDeclare(QUEUE_SMS_INFORM,true,false,false,null); 42 
43             /**
44  * 交換機和隊列綁定 45  * param1:隊列名稱 46  * param2:交換機名稱 47  * param3:路由key 48              */
49             channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_FANOUT_INFORM, ""); 50             // 定義消費方法
51             DefaultConsumer consumer = new DefaultConsumer(channel){ 52 
53                 /**
54  * 消費者接受消息調用此方法 55  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 56  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 57  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 58  * @param properties 59  * @param body 60  * @throws IOException 61                  */
62  @Override 63                 public void handleDelivery(String consumerTag, 64  Envelope envelope, 65  AMQP.BasicProperties properties, 66                                            byte[] body) throws IOException { 67                     // 消息id
68                     long deliveryTag = envelope.getDeliveryTag(); 69                     // 交換機
70                     String exchange = envelope.getExchange(); 71                     // 消息內容
72                     String msg = new String(body, "utf-8"); 73                     System.out.println("receive message:" + msg); 74  } 75  }; 76             /**
77  * 監聽隊列String queue, boolean autoAck, Consumer callback 78  * param1:隊列名稱 79  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 80  * 設置爲false則須要手動回覆 81  * param3:消費消息的方法,消費者接收到消息後調用此方法 82              */
83             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 84         } catch (IOException e) { 85  e.printStackTrace(); 86         } catch (TimeoutException e) { 87  e.printStackTrace(); 88  } 89  } 90 }
Consumer02SubscribeSms
    • 測試 -- 執行生產者 的main方法,執行兩個消費者的main方法,前後順序隨便
    • 發如今兩個消費者端的控制檯都能打印到消息
    • publish、subscribe與work queues的區別與相同點
      • work queues不用定義交換機,publish/subscribe須要定義交換機
      • publish/subscribe須要設置隊列和交換機的綁定,work queues不須要設置,實際上workqueues會將隊列綁定到默認的交換機
      • 二者實現的發佈訂閱效果是同樣的,多個消費端監聽同一個隊列不會重複消費消息
    • 實際工做用publish/subscrib仍是work queues
      • 建議使用publish/subscribe,發佈定語模式比工做隊列模式更強大,並且發佈訂閱模式能夠指定本身專用的交換機
  • Routing模式
    • 路由模式:

      • 每一個消費者監聽本身的隊列,並設置routingkey
      • 生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列
    • 生產者
      • 聲明exchange_routing_inform交換機
      • 聲明兩個隊列而且綁定到此交換機,綁定石須要指定routingkey
      • 發送消息時須要指定routingkey
      • 在生產者工程中添加生產者類Producer03Routing,在Produce03Routing中只綁定了接收信息消費者的路由信息
 1 public class Producer03Routing {  2     /**
 3  * 隊列名稱  4  * 路由名稱設定與隊列名稱同樣  5      */
 6     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  7     private static final String QUEUE_SMS_INFORM = "queue sms inform";  8     /**
 9  * 聲明EXCHANGE_ROUTING_INFORM交換機 10      */
11     private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 12 
13     public static void main(String[] args) { 14         Connection connection = null; 15         Channel channel = null; 16 
17         // 建立一個與MQ的鏈接
18         ConnectionFactory factory = new ConnectionFactory(); 19         factory.setHost("localhost"); 20         factory.setPort(5672); 21         factory.setUsername("guest"); 22         factory.setPassword("guest"); 23         // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器
24         factory.setVirtualHost("/"); 25 
26         try { 27             // 建立於RabbitMQ服務的鏈接
28             connection = factory.newConnection(); 29             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
30             channel = connection.createChannel(); 31             /**
32  * 聲明交換機 33  * param1:交換機名稱 34  * param2:交換機類型,fanout、topic、direct、headers 35              */
36  channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 37             /**
38  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 39  * param1:隊列名稱 40  * param2:是否持久化 41  * param3:隊列是否獨佔此鏈接 42  * param4:隊列再也不使用時自動刪除此列 43  * param5:隊列參數 44              */
45             channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 46             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 47             /**
48  * 交換機和隊列綁定 49  * param1:隊列名稱 50  * param2:交換機名稱 51  * param3:路由key 52              */
53  channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_EMAIL_INFORM); 54  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM); 55 
56             // 發送消息
57             for (int i = 0; i < 10; i++) { 58                 String message = "inform to user:" + i; 59                 /**
60  * 消息發佈方法 61  * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 62  * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 63  * param3:消息包含的屬性 64  * param4:消息體 65                  */
66                 channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM, null, message.getBytes()); 67  } 68         } catch (IOException e) { 69  e.printStackTrace(); 70         } catch (TimeoutException e) { 71  e.printStackTrace(); 72         } finally { 73             if (channel != null) { 74                 try { 75  channel.close(); 76                 } catch (IOException e) { 77  e.printStackTrace(); 78                 } catch (TimeoutException e) { 79  e.printStackTrace(); 80  } 81  } 82             if (connection != null) { 83                 try { 84  connection.close(); 85                 } catch (IOException e) { 86  e.printStackTrace(); 87  } 88  } 89  } 90  } 91 }
Producer03Routing
    • 接收郵件消費者
 1 public class Consumer03RoutingEmail {  2     /**
 3  * 隊列名稱  4  * 路由名稱設定與隊列名稱同樣  5      */
 6     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  7     /**
 8  * 聲明EXCHANGE_ROUTING_INFORM交換機  9      */
10     private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 11 
12     public static void main(String[] args) { 13         Connection connection = null; 14         Channel channel = null; 15 
16         // 建立一個與MQ的鏈接
17         ConnectionFactory factory = new ConnectionFactory(); 18         factory.setHost("localhost"); 19         factory.setPort(5672); 20         factory.setUsername("guest"); 21         factory.setPassword("guest"); 22         // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器
23         factory.setVirtualHost("/"); 24         try { 25             // 建立於RabbitMQ服務的鏈接
26             connection = factory.newConnection(); 27             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
28             channel = connection.createChannel(); 29             /**
30  * param1:交換機名稱 31  * param2:交換機類型,fanout、topic、direct、headers 32              */
33  channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 34             /**
35  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 36  * param1:隊列名稱 37  * param2:是否持久化 38  * param3:隊列是否獨佔此鏈接 39  * param4:隊列再也不使用時自動刪除此列 40  * param5:隊列參數 41              */
42             channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 43 
44             /**
45  * 交換機和隊列綁定 46  * param1:隊列名稱 47  * param2:交換機名稱 48  * param3:路由key 49              */
50  channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_EMAIL_INFORM); 51             // 定義消費方法
52             DefaultConsumer consumer = new DefaultConsumer(channel) { 53 
54                 /**
55  * 消費者接受消息調用此方法 56  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 57  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 58  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 59  * @param properties 60  * @param body 61  * @throws IOException 62                  */
63  @Override 64                 public void handleDelivery(String consumerTag, 65  Envelope envelope, 66  AMQP.BasicProperties properties, 67                                            byte[] body) throws IOException { 68                     // 消息id
69                     long deliveryTag = envelope.getDeliveryTag(); 70                     // 交換機
71                     String exchange = envelope.getExchange(); 72                     // 消息內容
73                     String msg = new String(body, "utf-8"); 74                     System.out.println("email receive message:" + msg); 75  } 76  }; 77             /**
78  * 監聽隊列String queue, boolean autoAck, Consumer callback 79  * param1:隊列名稱 80  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 81  * 設置爲false則須要手動回覆 82  * param3:消費消息的方法,消費者接收到消息後調用此方法 83              */
84             channel.basicConsume(QUEUE_EMAIL_INFORM, true, consumer); 85         } catch (IOException e) { 86  e.printStackTrace(); 87         } catch (TimeoutException e) { 88  e.printStackTrace(); 89  } 90  } 91 }
Consumer03RoutingEmail
    • 接收信息消費者
 1 public class Consumer03RoutingSms {  2     /**
 3  * 隊列名稱  4  * 路由名稱設定與隊列名稱同樣  5      */
 6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 聲明EXCHANGE_ROUTING_INFORM交換機  9      */
10     private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 11 
12     public static void main(String[] args) { 13         Connection connection = null; 14         Channel channel = null; 15 
16         // 建立一個與MQ的鏈接
17         ConnectionFactory factory = new ConnectionFactory(); 18         factory.setHost("localhost"); 19         factory.setPort(5672); 20         factory.setUsername("guest"); 21         factory.setPassword("guest"); 22         // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器
23         factory.setVirtualHost("/"); 24         try { 25             // 建立於RabbitMQ服務的鏈接
26             connection = factory.newConnection(); 27             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
28             channel = connection.createChannel(); 29             /**
30  * param1:交換機名稱 31  * param2:交換機類型,fanout、topic、direct、headers 32              */
33  channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 34             /**
35  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 36  * param1:隊列名稱 37  * param2:是否持久化 38  * param3:隊列是否獨佔此鏈接 39  * param4:隊列再也不使用時自動刪除此列 40  * param5:隊列參數 41              */
42             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 43 
44             /**
45  * 交換機和隊列綁定 46  * param1:隊列名稱 47  * param2:交換機名稱 48  * param3:路由key 49              */
50  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM); 51             // 定義消費方法
52             DefaultConsumer consumer = new DefaultConsumer(channel) { 53 
54                 /**
55  * 消費者接受消息調用此方法 56  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 57  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 58  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 59  * @param properties 60  * @param body 61  * @throws IOException 62                  */
63  @Override 64                 public void handleDelivery(String consumerTag, 65  Envelope envelope, 66  AMQP.BasicProperties properties, 67                                            byte[] body) throws IOException { 68                     // 消息id
69                     long deliveryTag = envelope.getDeliveryTag(); 70                     // 交換機
71                     String exchange = envelope.getExchange(); 72                     // 消息內容
73                     String msg = new String(body, "utf-8"); 74                     System.out.println("sms receive message:" + msg); 75  } 76  }; 77             /**
78  * 監聽隊列String queue, boolean autoAck, Consumer callback 79  * param1:隊列名稱 80  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 81  * 設置爲false則須要手動回覆 82  * param3:消費消息的方法,消費者接收到消息後調用此方法 83              */
84             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 85         } catch (IOException e) { 86  e.printStackTrace(); 87         } catch (TimeoutException e) { 88  e.printStackTrace(); 89  } 90  } 91 }
Consumer03RoutingSms

 

    • 測試:執行兩個消費者的main方法,執行生產者的main方法,發現只有接收信息的消費者可以接收到打印的消息
    • Routing與Publish/subscibe的區別
      • Routing模式要求隊列在綁定交換機是要指定routingkey,消息會轉發到符合routingkey的隊列
  • Topics模式
    • 匹配模式

      • 每個消費者監聽本身的隊列,而且設置帶通配符的routingkey
      • 生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列
    • 案例
      • 根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種通知類型都接收的則兩種通知都有效
    • 生產者
      • 聲明交換機,指定topics類型
 1 public class Producer04Topics {  2     /**
 3  * 隊列名稱  4      */
 5     private static final String QUEUE_EMAIL_INFORM = "queue email inform";  6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 路由key,#表明能夠匹配多個詞,符號*能夠匹配一個詞語  9      */
 10     private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";  11     private static final String ROUTINGKEY_SMS = "inform.#.sms.#";  12     /**
 13  * 聲明EXCHANGE_TOPICS_INFORM交換機  14      */
 15     private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform";  16 
 17     public static void main(String[] args) {  18         Connection connection = null;  19         Channel channel = null;  20 
 21         // 建立一個與MQ的鏈接
 22         ConnectionFactory factory = new ConnectionFactory();  23         factory.setHost("localhost");  24         factory.setPort(5672);  25         factory.setUsername("guest");  26         factory.setPassword("guest");  27         // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器
 28         factory.setVirtualHost("/");  29 
 30         try {  31             // 建立於RabbitMQ服務的鏈接
 32             connection = factory.newConnection();  33             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
 34             channel = connection.createChannel();  35             /**
 36  * 聲明交換機  37  * param1:交換機名稱  38  * param2:交換機類型,fanout、topic、direct、headers  39              */
 40  channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);  41             /**
 42  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立  43  * param1:隊列名稱  44  * param2:是否持久化  45  * param3:隊列是否獨佔此鏈接  46  * param4:隊列再也不使用時自動刪除此列  47  * param5:隊列參數  48              */
 49             channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null);  50             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null);  51             /**
 52  * 交換機和隊列綁定  53  * param1:隊列名稱  54  * param2:交換機名稱  55  * param3:路由key  56              */
 57  channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_EMAIL);  58  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS);  59 
 60 
 61             /**
 62  * 消息發佈方法  63  * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange  64  * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列  65  * param3:消息包含的屬性  66  * param4:消息體  67              */
 68             for (int i = 0; i < 5; i++) {  69                 //發送消息的時候指定routingKey
 70                 String message = "send email inform message to user";  71                 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());  72                 System.out.println("send to mq " + message);  73  }  74             for (int i = 0; i < 5; i++) {  75                 //發送消息的時候指定routingKey
 76                 String message = "send sms inform message to user";  77                 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());  78                 System.out.println("send to mq " + message);  79  }  80             for (int i = 0; i < 5; i++) {  81                 //發送消息的時候指定routingKey
 82                 String message = "send sms and email inform message to user";  83                 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());  84                 System.out.println("send to mq " + message);  85  }  86         } catch (IOException e) {  87  e.printStackTrace();  88         } catch (TimeoutException e) {  89  e.printStackTrace();  90         } finally {  91             if (channel != null) {  92                 try {  93  channel.close();  94                 } catch (IOException e) {  95  e.printStackTrace();  96                 } catch (TimeoutException e) {  97  e.printStackTrace();  98  }  99  } 100             if (connection != null) { 101                 try { 102  connection.close(); 103                 } catch (IOException e) { 104  e.printStackTrace(); 105  } 106  } 107  } 108  } 109 }
Producer04Topics
    • 消費者
      • 隊列綁定交換機指定通配符
      • 通配符規則
      • 中間以「.」分隔
      • 符號能夠匹配多個詞語,符號*能夠匹配一個詞語
    • 接收郵件消費者
 1 public class Consumer04TopicsEmail {  2     /**
 3  * 隊列名稱  4  * 路由名稱設定與隊列名稱同樣  5      */
 6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7 
 8     /**
 9  * 路由key,#表明能夠匹配多個詞,符號*能夠匹配一個詞語 10      */
11     private static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; 12     /**
13  * 聲明EXCHANGE_TOPICS_INFORM交換機 14      */
15     private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 16 
17     public static void main(String[] args) { 18         Connection connection = null; 19         Channel channel = null; 20 
21         // 建立一個與MQ的鏈接
22         ConnectionFactory factory = new ConnectionFactory(); 23         factory.setHost("localhost"); 24         factory.setPort(5672); 25         factory.setUsername("guest"); 26         factory.setPassword("guest"); 27         // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器
28         factory.setVirtualHost("/"); 29         try { 30             // 建立於RabbitMQ服務的鏈接
31             connection = factory.newConnection(); 32             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
33             channel = connection.createChannel(); 34             /**
35  * param1:交換機名稱 36  * param2:交換機類型,fanout、topic、direct、headers 37              */
38  channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 39             /**
40  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 41  * param1:隊列名稱 42  * param2:是否持久化 43  * param3:隊列是否獨佔此鏈接 44  * param4:隊列再也不使用時自動刪除此列 45  * param5:隊列參數 46              */
47             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 48 
49             /**
50  * 交換機和隊列綁定 51  * param1:隊列名稱 52  * param2:交換機名稱 53  * param3:路由key 54              */
55  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_EMAIL); 56             // 定義消費方法
57             DefaultConsumer consumer = new DefaultConsumer(channel) { 58 
59                 /**
60  * 消費者接受消息調用此方法 61  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 62  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 63  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 64  * @param properties 65  * @param body 66  * @throws IOException 67                  */
68  @Override 69                 public void handleDelivery(String consumerTag, 70  Envelope envelope, 71  AMQP.BasicProperties properties, 72                                            byte[] body) throws IOException { 73                     // 消息id
74                     long deliveryTag = envelope.getDeliveryTag(); 75                     // 交換機
76                     String exchange = envelope.getExchange(); 77                     // 消息內容
78                     String msg = new String(body, "utf-8"); 79                     System.out.println("email receive message:" + msg); 80  } 81  }; 82             /**
83  * 監聽隊列String queue, boolean autoAck, Consumer callback 84  * param1:隊列名稱 85  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 86  * 設置爲false則須要手動回覆 87  * param3:消費消息的方法,消費者接收到消息後調用此方法 88              */
89             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 90         } catch (IOException e) { 91  e.printStackTrace(); 92         } catch (TimeoutException e) { 93  e.printStackTrace(); 94  } 95  } 96 }
Consumer04TopicsEmail
    • 接收信息消費者
 1 public class Consumer04TopicsSms {  2     /**
 3  * 隊列名稱  4  * 路由名稱設定與隊列名稱同樣  5      */
 6     private static final String QUEUE_SMS_INFORM = "queue sms inform";  7     /**
 8  * 路由key,#表明能夠匹配多個詞,符號*能夠匹配一個詞語  9      */
10     private static final String ROUTINGKEY_SMS = "inform.#.sms.#"; 11     /**
12  * 聲明EXCHANGE_TOPICS_INFORM交換機 13      */
14     private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 15 
16     public static void main(String[] args) { 17         Connection connection = null; 18         Channel channel = null; 19 
20         // 建立一個與MQ的鏈接
21         ConnectionFactory factory = new ConnectionFactory(); 22         factory.setHost("localhost"); 23         factory.setPort(5672); 24         factory.setUsername("guest"); 25         factory.setPassword("guest"); 26         // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器
27         factory.setVirtualHost("/"); 28         try { 29             // 建立於RabbitMQ服務的鏈接
30             connection = factory.newConnection(); 31             // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務
32             channel = connection.createChannel(); 33             /**
34  * param1:交換機名稱 35  * param2:交換機類型,fanout、topic、direct、headers 36              */
37  channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 38             /**
39  * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 40  * param1:隊列名稱 41  * param2:是否持久化 42  * param3:隊列是否獨佔此鏈接 43  * param4:隊列再也不使用時自動刪除此列 44  * param5:隊列參數 45              */
46             channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 47 
48             /**
49  * 交換機和隊列綁定 50  * param1:隊列名稱 51  * param2:交換機名稱 52  * param3:路由key 53              */
54  channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS); 55             // 定義消費方法
56             DefaultConsumer consumer = new DefaultConsumer(channel) { 57 
58                 /**
59  * 消費者接受消息調用此方法 60  * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 61  * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 62  * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 63  * @param properties 64  * @param body 65  * @throws IOException 66                  */
67  @Override 68                 public void handleDelivery(String consumerTag, 69  Envelope envelope, 70  AMQP.BasicProperties properties, 71                                            byte[] body) throws IOException { 72                     // 消息id
73                     long deliveryTag = envelope.getDeliveryTag(); 74                     // 交換機
75                     String exchange = envelope.getExchange(); 76                     // 消息內容
77                     String msg = new String(body, "utf-8"); 78                     System.out.println("sms receive message:" + msg); 79  } 80  }; 81             /**
82  * 監聽隊列String queue, boolean autoAck, Consumer callback 83  * param1:隊列名稱 84  * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 85  * 設置爲false則須要手動回覆 86  * param3:消費消息的方法,消費者接收到消息後調用此方法 87              */
88             channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 89         } catch (IOException e) { 90  e.printStackTrace(); 91         } catch (TimeoutException e) { 92  e.printStackTrace(); 93  } 94  } 95 }
Consumer04TopicsSms
    • 測試:使用生產者發送若干條消息,交換機根據routingkey通配符匹配並轉發消息到指定的隊列
    • 思考:本案例的需求使用Routing工做模式是否能實現
      • 使用Routing模式也能夠實現本案例,共設置三個routingkey,分別是email、sms、all、email隊列綁定email和all,sms隊列綁定sms和all,這樣就能夠實現上面案例的功能,實現過程比topics複雜
      • topics模式更增強大,能夠實現Routing。publish/subscribe模式的功能
  • Header模式
    • header模式與routing不一樣的地方在於,header模式取消routingkey,使用header中的key/value(鍵值對)匹配隊列
    • 案例:根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種通知類型都接收的則兩種通知都有效。
    • 生產者
      • 隊列與交換機綁定的代碼與以前不一樣,以下:
1 Map<String, Object> headers_email = new Hashtable<String, Object>(); 2 headers_email.put("inform_type", "email"); 3 Map<String, Object> headers_sms = new Hashtable<String, Object>(); 4 headers_sms.put("inform_type", "sms"); 5 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 6 channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
    • 通知:
1 String message = "email inform to user"+i; 2 Map<String,Object> headers =  new Hashtable<String, Object>(); 3 headers.put("inform_type", "email");//匹配email通知消費者綁定的header 4 //headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header
5 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); 6 properties.headers(headers); 7 //Email通知
8 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
    • 發送郵件消費者
1 channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); 2 Map<String, Object> headers_email = new Hashtable<String, Object>(); 3 headers_email.put("inform_email", "email"); 4 //交換機和隊列綁定
5 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 6 //指定消費隊列
7 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
  • RPC模式
    • RPC即客戶端遠程調用服務端的方法,使用MQ能夠實現RPC的異步調用,基於Direct交換機實現,流程以下:

      • 客戶端既是生產者也是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列
      • 服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,獲得方法返回的結果
      • 服務端將RPC方法的結果發送到RPC響應隊列
      • 客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用的結果

5、SpringBoot整合RabbitMQ

  5.1 利用SpringBoot快速建立生產者與消費者兩個工程

  咱們選擇基於Spring-Rabbit去操做RabbitMQ

  1.兩個工程中添加一樣的maven依賴

 1 <dependency>
 2     <groupId>org.springframework.boot</groupId>
 3     <artifactId>spring‐boot‐starter‐amqp</artifactId>
 4 </dependency>
 5 <dependency>
 6     <groupId>org.springframework.boot</groupId>
 7     <artifactId>spring‐boot‐starter‐test</artifactId>
 8 </dependency>
 9 <dependency>
10     <groupId>org.springframework.boot</groupId>
11     <artifactId>spring‐boot‐starter‐logging</artifactId>
12 </dependency>

  2.配置文件application.yml

 1 server:  2   port: 44000  3 spring:  4   application:  5     name: test‐rabbitmq‐producer/consumer #兩個工程的名稱  6   rabbitmq:  7     host: 127.0.0.1  8     port: 5672  9     username: guest 10     password: guest 11     virtualHost: /

  3.定義RabbitConfig類,配置Exchange、Queue、以及綁定交換機 -- 案例中使用Topics交換機

 1 @Configuration  2 public class RabbitmqConfig {  3     public static final String QUEUE_EMAIL_INFORM = "queue_email_inform";  4     public static final String QUEUE_SMS_INFORM = "queue_sms_inform";  5     public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";  6 
 7     /**
 8  * 聲明交換機  9  * @return
10      */
11  @Bean(EXCHANGE_TOPICS_INFORM) 12     public Exchange EXCHANGE_TOPICS_INFORM(){ 13         // durable(true):持久化,消息隊列重啓後交換機仍然存在
14         return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 15  } 16 
17     /**
18  * 聲明隊列 19  * @return
20      */
21  @Bean(QUEUE_EMAIL_INFORM) 22     public Queue QUEUE_EMAIL_INFORM(){ 23         return new Queue(QUEUE_EMAIL_INFORM); 24  } 25 
26  @Bean(QUEUE_SMS_INFORM) 27     public Queue QUEUE_SMS_INFORM(){ 28         return new Queue(QUEUE_SMS_INFORM); 29  } 30 
31     /**
32  * 綁定隊列到交換機 33  * @param queue 34  * @param exchange 35  * @return
36      */
37     public Binding BINDING_QUEUE_EMAIL(@Qualifier(QUEUE_EMAIL_INFORM) Queue queue, 38  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ 39         return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); 40  } 41 
42     public Binding BINDING_QUEUE_SMS(@Qualifier(QUEUE_SMS_INFORM) Queue queue, 43  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ 44         return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); 45  } 46 }

  4.以上爲兩個工程中相同的地方,接下來配置不一樣的地方

  • 消費端工程配置一個消息監聽類ReceiveHandler

1 @Component 2 public class ReceiveHandler { 3 4 // 監聽email隊列 5 @RabbitListener(queues = {RabbitmqConfig.QUEUE_EMAIL_INFORM}) 6 public void receiveEmail(String msg) { 7 System.out.println("email隊列接收到消息:" + msg); 8 } 9 10 // 監聽sms隊列 11 @RabbitListener(queues = {RabbitmqConfig.QUEUE_SMS_INFORM}) 12 public void receiveSMS(String msg) { 13 System.out.println("sms隊列接收到消息:" + msg); 14 } 15 }

  • 生產端工程直接在測試方法中測試消息發送功能
 1 @RunWith(SpringRunner.class)  2 @SpringBootTest  3 public class SpringBootTopicsProducerTest {  4     
 5  @Autowired  6     public RabbitTemplate rabbitTemplate;  7 
 8  @Test  9     public void testSendMessageByTopics(){ 10         for (int i = 0; i < 5; i++) { 11             String message = "sms email inform to user"+i; 12             rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message); 13  } 14  } 15 }
  • 5.測試

  消費端執行工程中的SpringBoot的引導類的main方法執行消息的監聽,生產端執行測試方法testSendMessageByTopics()

  控制檯打印接收到的消息:

  由於隊列消息的監聽是異步的,因此會出現消息打印交替出現的現象,這裏簡單的SpringBoot整合RabbitMQ的案例就結束了。:)

相關文章
相關標籤/搜索