深刻剖析 RabbitMQ —— Spring 框架下實現 AMQP 高級消息隊列協議

前言

消息隊列在現今數據量大,併發量高的系統中是十分經常使用的。本文將會對現時最經常使用到的幾款消息隊列框架 ActiveMQ、RabbitMQ、Kafka 進行分析對比。spring

詳細介紹 RabbitMQ 在 Spring 框架下的結構及實現原理,從Producer 端的事務、回調函數(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器進行詳細的分析。經過對 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等經常使用類型介紹,深刻剖析在消息處理各個傳輸環節中的原理及注意事項。數據庫

並舉以實例對死信隊列、持久化操做進行一一介紹。bash

1、RabbitMQ 與 AMQP 的關係

1.1 AMQP簡介服務器

AMQP(Advanced Message Queue Protocol 高級消息隊列協議)是一個消息隊列協議,它支持符合條件的客戶端和消息代理中間件(message middleware broker)進行通信。RabbitMQ 則是 AMQP 協議的實現者,主要用於在分佈式系統中信息的存儲發送與接收,RabbitMQ 的服務器端用 Erlang 語言編寫,客戶端支持多種開發語言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。網絡

1.2 ActiveMQ、RabbitMQ、Kafka 對比併發

如今在市場上有 ActiveMQ、RabbitMQ、Kafka 等多個經常使用的消息隊列框架,與其餘框架對比起來,RabbitMQ 在易用性、擴展性、高可用性、多協議、支持多語言客戶端等方面都有不俗表現。app

1.2.1 AcitveMQ 特色框架

ActiveMQ 是 Apache 以 Java 語言開發的消息模型,它完美地支持 JMS(Java Message Service)消息服務,客戶端支持 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多種開主發語言,支持OpenWire、Stomp、REST、XMPP、AMQP 等多種協議。ActiveMQ 採用異步消息傳遞方式,在設計上保證了多主機集羣,客戶端-服務器,點對點等模式的有效通訊。從開始它就是按照 JMS 1.1 和 J2EE 1.4 規範進行開發,實現了消息持久化,XA,事務支撐等功能。經歷多年的升級完善,現今已成爲 Java 應用開發中主流的消息解決方案。但相比起 RabbitMQ、Kafka 它的主要缺點表現爲資源消耗比較大,吞吐量較低,在高併發的狀況下系統支撐能力較弱。若是系統全程使用 Java 開發,其併發量在可控範圍內,或系統須要支持多種不一樣的協議,使用 ActiveMQ 可更輕便地搭建起消息隊列服務。dom

1.2.2 Kafka 特色異步

Kafka 天生是面向分佈式系統開發的消息隊列,它具備高性能、容災性、可動態擴容等特色。Kafka 與生俱來的特色在於它會把每一個Partition 的數據都備份到不一樣的服務器當中,並與 ZooKeeper 配合,當某個Broker 故障失效時,ZooKeeper 服務就會將通知生產者和消費者,從備份服務器進行數據恢復。在性能上 Kafka 也大大超越了傳統的 ActiveMQ、RabbitMQ ,因爲 Kafka 集羣可支持動態擴容,在負載量到達峯值時可動態增長新的服務器進集羣而無需重啓服務。但因爲 Kafka 屬於分佈式系統,因此它只能在同一分區內實現消息有序,沒法實現全局消息有序。並且它內部的監控機制不夠完善,須要安裝插件,依賴ZooKeeper 進行元數據管理。若是系統屬於分佈式管理機制,數據量較大且併發量難以預估的狀況下,建議使用 Kafka 隊列。

1.2.3 RabbitMQ 對比

因爲 ActiveMQ 過於依賴 JMS 的規範而限制了它的發展,因此 RabbitMQ 在性能和吞吐量上明顯會優於 ActiveMQ。

因爲上市時間較長,在可用性、穩定性、可靠性上 RabbitMq 會比 Kafka 技術成熟,並且 RabbitMq 使用 Erlang 開發,因此天生具有高併發高可用的特色。而 Kafka 屬於分佈式系統,它的性能、吞吐量、TPS 都會比 RabbitMq 要強。

2、RabbitMQ 的實現原理

2.1 生產者(Producer)、消費者(Consumer)、服務中心(Broker)之間的關係

首先簡單介紹 RabbitMQ 的運行原理,在 RabbitMQ 使用時,系統會先安裝並啓動 Broker Server,也就是 RabbitMQ 的服務中心。不管是生產者 (Producer),消費者(Consumer)都會經過鏈接池(Connection)使用 TCP/IP 協議(默認)來與 BrokerServer 進行鏈接。而後 Producer 會把 Exchange / Queue 的綁定信息發送到 Broker Server,Broker Server 根據 Exchange 的類型邏輯選擇對應 Queue ,最後把信息發送到與 Queue 關聯的對應 Consumer 。


2.2 交換器(Exchange)、隊列(Queue)、信道(Channel)、綁定(Binding)的概念

2.2.1 交換器 Exchange

Producer 創建鏈接後,並不是直接將消息投遞到隊列 Queue 中,而是把消息發送到交換器 Exchange,由 Exchange 根據不一樣邏輯把消息發送到一個或多個對應的隊列當中。目前 Exchange 提供了四種不一樣的經常使用類型:Fanout、Direct、Topic、Header。

  • Fanout類型

此類型是最爲常見的交換器,它會將消息轉發給全部與之綁定的隊列上。好比,有N個隊列與 Fanout 交換器綁定,當產生一條消息時,Exchange 會將該消息的N個副本分別發給每一個隊列,相似於廣播機制。

  • Direct類型

此類型的 Exchange 會把消息發送到 Routing_Key 徹底相等的隊列當中。多個 Cousumer 可使用相同的關鍵字進行綁定,相似於數據庫的一對多關係。好比,Producer 以 Direct 類型的 Exchange 推送 Routing_Key 爲 direct.key1 的隊列,系統再指定多個 Cousumer 綁定 direct.key1。如此,消息就會被分發至多個不一樣的 Cousumer 當中。

  • Topic類型

此類型是最靈活的一種方式配置方式,它可使用模糊匹配,根據 Routing_Key 綁定到包含該關鍵字的不一樣隊列中。好比,Producer 使用 Topic類型的 Exchange 分別推送 Routing_Key 設置爲 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不一樣隊列,Cousumer 只須要把 Routing_Key 設置爲 topic.guangdong.# ,就能夠把全部消息接收處理。

  • Headers類型

該類型的交換器與前面介紹的稍有不一樣,它再也不是基於關鍵字 Routing_Key 進行路由,而是基於多個屬性進行路由的,這些屬性比路由關鍵字更容易表示爲消息的頭。也就是說,用於路由的屬性是取自於消息 Header 屬性,當消息 Header 的值與隊列綁定時指定的值相同時,消息就會路由至相應的隊列中。

2.2.2 Queue 隊列

Queue 隊列是消息的載體,每一個消息都會被投入到 Queue 當中,它包含 name,durable,arguments 等多個屬性,name 用於定義它的名稱,當 durable(持久化)爲 true 時,隊列將會持久化保存到硬盤上。反之爲 false 時,一旦 Broker Server 被重啓,對應的隊列就會消失,後面還會有例子做詳細介紹。

2.2.3 Channel 通道

當 Broker Server 使用 Connection 鏈接 Producer / Cousumer 時會使用到信道(Channel),一個 Connection上能夠創建多個 Channel,每一個 Channel 都有一個會話任務,能夠理解爲邏輯上的鏈接。主要用做管理相關的參數定義,發送消息,獲取消息,事務處理等。

2.2.4 Binding 綁定

Binding 主要用於綁定交換器 Exchange 與 隊列 Queue 之間的對應關係,並記錄路由的 Routing-Key。Binding 信息會保存到系統當中,用於 Broker Server 信息的分發依據。

3、RabbitMQ 應用實例

3.1 Rabbit 經常使用類說明

3.1.1 RabbitTemplate 類

Spring 框架已經封裝了 RabbitTemplate 對 RabbitMQ 的綁定、隊列發送、接收進行簡化管理

3.2 初探 RabbitMQ

在官網下載併成功安裝完 RabbitMQ 後,打開默認路徑 http://localhost:15672/#/ 便可看到 RabbitMQ 服務中心的管理界面

3.2.1 Producer 端開發

先在 pom 中添加 RabbitMQ 的依賴,並在 application.yml 中加入 RabbitMQ 賬號密碼等信息。此例子,咱們嘗試使用 Direct 交換器把隊列發送到不一樣的 Consumer

1 **********************pom *************************
 2 <project>
 3         .............
 4     <dependency>
 5         <groupId>org.springframework.boot</groupId>
 6         <artifactId>spring-boot-starter-amqp</artifactId>
 7         <version>2.0.5.RELEASE</version>
 8     </dependency>
 9 </project>
10 
11 ****************  application.yml  ****************
12 spring:
13   application:
14      name: rabbitMqProducer
15   rabbitmq:
16     host: localhost 
17     port: 5672
18     username: admin
19     password: 12345678
20     virtual-host: /LeslieHost
複製代碼

首先使用 CachingConnectionFactory 創建連接,經過 BindingBuilder 綁定 Exchange、Queue、RoutingKey之間的關係。

而後經過 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把信息發送到 Broken Server

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 public class BindingConfig {
32     public final static String first="direct.first";
33     public final static String second="direct.second";
34     public final static String Exchange_NAME="directExchange";
35     public final static String RoutingKey1="directKey1";
36     public final static String RoutingKey2="directKey2";
37     
38     @Bean
39     public Queue queueFirst(){
40         return new Queue(first);
41     }
42     
43     @Bean
44     public Queue queueSecond(){
45         return new Queue(second);
46     }
47     
48     @Bean
49     public DirectExchange directExchange(){
50         return new DirectExchange(Exchange_NAME,true,true);
51     }
52     
53     //利用BindingBuilder綁定Direct與queueFirst
54     @Bean
55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
57     }
58     
59     //利用BindingBuilder綁定Direct與queueSecond
60     @Bean
61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
63     }   
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71     
72     @RequestMapping("/send")
73     public void send() {
74         for(int n=0;n<100;n++){   
75 
76             template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData());
77             template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData());
78         }
79     }
80 
81      private CorrelationData getCorrelationData(){
82         return new CorrelationData(UUID.randomUUID().toString());
83     }
84 }
複製代碼

此時,打開 RabbitMQ 管理界面,可看到 Producer 已經向 Broken Server 的 direct.first / direct.second 兩個 Queue 分別發送100 個 Message


3.2.2 Consumer 端開發

分別創建兩個不一樣的 Consumer ,一個綁定 direct.first 別一個綁定 direct.second , 而後經過註解 @RabbitListener 監聽不一樣的 queue,當接到到 Producer 推送隊列時,顯示隊列信息。

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 public class BindingConfig {
32     public final static String first="direct.first";
33     public final static String Exchange_NAME="directExchange";
34     public final static String RoutingKey1="directKey1";
35     
36     @Bean
37     public Queue queueFirst(){
38         return new Queue(first);
39     }
40     
41     @Bean
42     public DirectExchange directExchange(){
43         return new DirectExchange(Exchange_NAME);
44     }
45     
46     //利用BindingBuilder綁定Direct與queueFirst
47     @Bean
48     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
49         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
50     }  
51 }
52 
53 @Configuration
54 @RabbitListener(queues="direct.first")
55 public class RabbitMqListener {
56     
57     @RabbitHandler
58     public void handler(String message){
59         System.out.println(message);
60     }
61 }
62 
63 @SpringBootApplication
64 public class App {
65     
66     public static void main(String[] args){
67         SpringApplication.run(App.class, args);
68     }
69 }
複製代碼

運行後能夠觀察到不一樣的 Consumer 會收到不一樣隊列的消息


若是以爲使用 Binding 代碼綁定過於繁瑣,還能夠直接在監聽類RabbitMqListener中使用 @QueueBinding 註解綁定

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.second"),
34 key="directKey2"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }
複製代碼

運行結果

4、Producer 端的消息發送與監控

前面一節已經介紹了RabbitMQ的基本使用方法,這一節將從更深刻的層面講述 Producer 的應用。

試想一下這種的情形,若是因 RabbitTemplate 發送時 Exchange 名稱綁定錯誤,或 Broken Server 因網絡問題或服務負荷過大引起異常,Producer 發送的隊列丟失,系統沒法正常工做。此時,開發人員應該進行一系列應對措施進行監測,確保每一個數據都能正常推送到 Broken Server 。有見及此,RabbitMQ 專門爲你們提供了兩種解決方案,一是使用傳統的事務模式,二是使用回調函數,下面爲你們做詳介紹。

4.1 Producer 端的事務管理

在須要使用事務時,能夠經過兩種方法

第一能夠調用 channel 類的方法以傳統模式進行管理,事務開始時調用 channel.txSelect(),信息發送後進行確認 channel.txCommit(),一旦捕捉到異常進行回滾 channel.txRollback(),最後關閉事務。

1 @Controller
 2 @RequestMapping("/producer")
 3 public class ProducerController {
 4     @Autowired
 5     private RabbitTemplate template;
 6  
 7     @RequestMapping("/send")
 8     public void send1(HttpServletResponse response) 
 9         throws InterruptedException, IOException,  TimeoutException{
10         Channel channel=template.getConnectionFactory().createConnection().createChannel(true);
11         .......
12         try{
13             channel.txSelect();
14             channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes());
15             channel.txCommit();
16         }catch(Exception e){
17             channel.txRollback();
18         }finally{
19             channel.close();
20         }
21         ......
22         ......
23         ......
24     }
25 }
複製代碼

第二還能夠直接經過 RabbitTemplate 的配置方法 void setChannelTransacted(bool isTransacted) 直接開啓事務

1 public class ProducerController {
 2     @Autowired
 3     private ConnectionConfig connection;
 4 
 5     @Autowired
 6     @Bean
 7     private RabbitTemplate template(){
 8         RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory());
 9         template.setChannelTransacted(true);
10         return template;
11     }
12  
13     @RequestMapping("/send")
14     @Transactional(rollbackFor=Exception.class)
15     public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{
16         ..........
17         ..........
18         ..........
19     }
20 }
複製代碼

4.2 利用 ConfirmCallback 回調確認消息是否成功發送到 Exchange

使用事務模式消耗的系統資源比較大,系統每每會處理長期等待的狀態,在併發量較高的時候也有可能形成死鎖的隱患。有見及此,系統提供了輕量級的回調函數方式進行異步處理。

當須要確認消息是否成功發送到 Exchange 的時候,可使用 ConfirmCallback 回調函數。使用該函數,系統推送消息後,該線程便會獲得釋放,等 Exchange 接收到消息後系統便會異步調用 ConfirmCallback 綁定的方法進行處理。ConfirmCallback 只包含一個方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法會把每條數據發送到 Exchange 時候的 ack 狀態(成功/失敗),cause 成敗緣由,及對應的 correlationData(CorrelationData 只包含一個屬性 id,是綁定發送對象的惟一標識符) 返還到 Producer,讓Producer 進行相應處理。

注意:在綁定 ConfirmCallback 回調函數前,請先把 publisher-confirms 屬性設置爲 true

1 spring:
 2   application:
 3      name: rabbitmqproducer
 4   rabbitmq:
 5     host: 127.0.0.1 
 6     port: 5672
 7     username: admin
 8     password: 12345678
 9     virtual-host: /LeslieHost
複製代碼

例如:下面的例子,特地將 RabbitTemplate 發送時所綁定的 Exchange 名稱填寫爲錯誤名稱 「 ErrorExchange 」,形成發送失敗,而後在回調函數中檢查失敗的緣由。

Producer 端代碼:

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         System.out.println(host);
22         factory.setHost(host);
23         factory.setPort(port);
24         factory.setUsername(username);
25         factory.setPassword(password);
26         factory.setVirtualHost(virtualHost);
27         factory.setPublisherConfirms(true);
28         factory.setPublisherReturns(true);
29         return factory;
30     }
31 }
32 
33 @Configuration
34 public class BindingConfig {
35     public final static String first="direct.first";
36     public final static String Exchange_NAME="directExchange";
37     public final static String RoutingKey1="directKey1";
38     
39     @Bean
40     public Queue queueFirst(){
41         return new Queue(first);
42     }
43 
44     @Bean
45     public DirectExchange directExchange(){
46         return new DirectExchange(Exchange_NAME);
47     }
48     
49     @Bean
50     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
51         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
52     }  
53 }
54 
55 @Component
56 public class MyConfirmCallback implements ConfirmCallback {
57     
58     @Override
59     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
60         // TODO 自動生成的方法存根
61         // TODO 自動生成的方法存根
62         if(ack){
63             System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause);
64         }else
65             System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause);
66     }
67 }
68 
69 @Controller
70 @RequestMapping("/producer")
71 public class ProducerController {
72     @Autowired
73     private RabbitTemplate template;
74     @Autowired
75     private MyConfirmCallback confirmCallback;
76 
77     @RequestMapping("/send")
78     public void send() {
79         template.setConfirmCallback(confirmCallback);       
80         for(int n=0;n<2;n++){   
81             template.convertAndSend("ErrorExchange",
82                      BindingConfig.RoutingKey1,"I'm the first queue! "
83                      +String.valueOf(n),getCorrelationData());
84         }
85     }
86 
87      private CorrelationData getCorrelationData(){
88         return new CorrelationData(UUID.randomUUID().toString());
89     }
90 }    
複製代碼

Consumer端代碼

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }
複製代碼

運行結果:

4.3 綁定 CorrelationData 與發送對象的關係

上面的例子當中,CorrelationData 只是用一個隨機的 UUID 做爲 CorrelationID,而在現實的應用場景中,因爲 ConfirmCallback 只反回標識值 CorrelationData,而沒有把隊列裏的對象值也一同返回。因此,在推送隊列時能夠先用 Key-Value 保存 CorrelationID 與所發送信息的關係,這樣當 ConfirmCallback 回調時,就可根據 CorrelationID 找回對象,做進一步處理。

下面例子,咱們把要發送的對象放在虛擬數據 DataSource 類中,用 DataRelation 記錄 CorrelationID 與發送對象 OrderID 的關係,而後在回調函數 ConfirmCallback 中根據 CorrelationID 查找對應的 OrderEntity,若是發送成功,則刪除綁定。若是發送失敗,能夠從新發送或根據狀況再做處理。

Producer端代碼:

1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         System.out.println(host);
 22         factory.setHost(host);
 23         factory.setPort(port);
 24         factory.setUsername(username);
 25         factory.setPassword(password);
 26         factory.setVirtualHost(virtualHost);
 27         factory.setPublisherConfirms(true);
 28         factory.setPublisherReturns(true);
 29         return factory;
 30     }
 31 }
 32 
 33 @Configuration
 34 public class BindingConfig {
 35     public final static String first="direct.first";
 36     //Exchange 使用 direct 模式     
 37     public final static String Exchange_NAME="directExchange";
 38     public final static String RoutingKey1="directKey1";
 39     
 40     @Bean
 41     public Queue queueFirst(){
 42         return new Queue(first);
 43     }
 44     
 45     @Bean
 46     public DirectExchange directExchange(){
 47         return new DirectExchange(Exchange_NAME);
 48     }
 49     
 50     @Bean
 51     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 52         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 53     }
 54 }
 55 
 56 @Data
 57 public class OrderEntity implements Serializable{
 58     private String id;
 59     private String goods;
 60     private Double price;
 61     private Integer count;
 62     
 63     public OrderEntity(String id,String goods,Double price,Integer count){
 64         this.id=id;
 65         this.goods=goods;
 66         this.price=price;
 67         this.count=count;
 68     }
 69     
 70     public OrderEntity(){}
 71     
 72     public String getId() {
 73         return id;
 74     }
 75     public void setId(String id) {
 76         this.id = id;
 77     }
 78 
 79     public String getGoods() {
 80         return goods;
 81     }
 82 
 83     public void setGoodsId(String goods) {
 84         this.goods = goods;
 85     }
 86 
 87     public Integer getCount() {
 88         return count;
 89     }
 90 
 91     public void setCount(Integer count) {
 92         this.count = count;
 93     }
 94 
 95     public Double getPrice() {
 96         return price;
 97     }
 98 
 99     public void setPrice(Double price) {
100         this.price = price;
101     }
102 }
103 
104 @Component
105 public class DataSource {
106     //加入虛擬數據
107     private static List<OrderEntity> list=new ArrayList<OrderEntity>(
108             Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
109                           new OrderEntity("002","Huwei P30 Plus",5400.00,1),
110                           ..........));
111     
112     public DataSource(){
113     }
114     
115     public List<OrderEntity> getOrderList(){
116         return list;
117     }
118     
119     //根據Id獲取對應order
120     public OrderEntity getOrder(String id){
121         for(OrderEntity order:list){
122             if(order.getId()==id)
123                 return order;
124         }
125         return null;
126     }
127 }
128 
129 public class DataRelation {
130     public static Map map=new HashMap();
131     
132     //綁定關係
133     public static void add(String key,String value){
134         if(!map.containsKey(key))
135             map.put(key,value);
136     }
137     
138     //返回orderId
139     public static Object get(String key){
140         if(map.containsKey(key))
141             return map.get(key);
142         else
143             return null;
144     }
145     
146     //根據 orderId 刪除綁定關係
147     public static void del(String key){
148         if(map.containsKey(key))
149            map.remove(key);
150     }
151 }
152 
153 @Component
154 public class MyConfirmCallback implements ConfirmCallback {
155     @Autowired
156     private DataSource datasource;
157     
158     @Override
159     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
160         String correlationId=correlationData.getId();
161         //根據 correclationId取回對應的orderId
162         String orderId=DataRelation.get(correlationId).toString();
163         //在datasource中找回對應的order
164         OrderEntity order=datasource.getOrder(orderId);
165         
166         if(ack){
167             System.out.println("--------------------ConfirmCallback-------------------\n"                 
168                 +" order's ack is true!\nId:"+order.getId()+" Goods:"+order.getGoods()
169                 +" Count:"+order.getCount().toString()+" Price:"+order.getPrice());
170             DataRelation.del(correlationId);    //操做完成刪除對應綁定
171         }else {
172             System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause);
173             //可在記錄日誌後把Order推送到隊列進行從新發送
174             .......
175         }
176     }
177 }
178 
179 @Controller
180 @RequestMapping("/producer")
181 public class ProducerController {
182     @Autowired
183     private RabbitTemplate template;
184     @Autowired
185     private MyConfirmCallback confirmCallback;
186     @Autowired
187     private DataSource dataSource;
188 
189     @RequestMapping("/send")
190     public void send() throws InterruptedException, IOException{
191         //綁定 ConfirmCallback 回調函數
192         template.setConfirmCallback(confirmCallback);
193  
194         for(OrderEntity order:dataSource.getOrderList()){
195             CorrelationData correlationData=getCorrelationData();
196             //保存 CorrelationId 與 orderId關係
197             DataRelation.add(correlationData.getId(), order.getId());
198             //把 order 插入隊列
199             template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData);
200         }
201     }
202     
203     private CorrelationData getCorrelationData(){
204         return new CorrelationData(UUID.randomUUID().toString());
205     }
206 }
複製代碼

Consumer 端代碼

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }
複製代碼

運行結果

4.4 利用 ReturnCallback 處理隊列 Queue 錯誤

使用 ConfirmCallback 函數只能判斷消息是否成功發送到 Exchange,但並不能保證消息已經成功進行隊列 Queue。因此,系統預備了另外一個回調函數 ReturnCallback 來監聽 Queue 隊列處理的成敗。若是隊列錯誤綁定不存在的 queue,或者 Broken Server 瞬間出現問題末能找到對應的 queue,系統就會激發 Producer 端 ReturnCallback 的回調函數來進行錯誤處理。 ReturnCallback 回調接口只包含一個方法 void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey),它會把出錯的 replyCode,replyText,exchange,routingKey等值都一塊兒返還。與 ConfirmCallback 不一樣的是,returnedMessage 會把隊列中的對象保存到 Message 的 Body 屬性中並返還到回調函數。

注意:在綁定 ReturnCallback 回調函數前,請先把 publisher-returns 及 mandatory 屬性設置爲 true 。 mandatory 參數默認爲 false,用於判斷 broken server是否把錯誤的對象返還到 Producer。如末進行設置,系統將把錯誤的消息丟棄。

下面例子咱們在調用 convertAndSend 方法時特地把 routingKey 設置爲 ErrorKey,觸發 ReturnCallback 回調,而後在 ReturenCallback 的回調方法顯示 replyCode,replyText,exchange,routingKey 等值,並把隊列中對象屬性一併顯示。

Producer 端代碼

1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         System.out.println(host);
 22         factory.setHost(host);
 23         factory.setPort(port);
 24         factory.setUsername(username);
 25         factory.setPassword(password);
 26         factory.setVirtualHost(virtualHost);
 27         factory.setPublisherConfirms(true);
 28         factory.setPublisherReturns(true);
 29         return factory;
 30     }
 31 }
 32 
 33 @Configuration
 34 public class BindingConfig {
 35     public final static String first="direct.first";
 36     public final static String Exchange_NAME="directExchange";
 37     public final static String RoutingKey1="directKey1";
 38     
 39     @Bean
 40     public Queue queueFirst(){
 41         return new Queue(first);
 42     }
 43 
 44     @Bean
 45     public DirectExchange directExchange(){
 46         return new DirectExchange(Exchange_NAME);
 47     }
 48     
 49     @Bean
 50     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 51         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 52     } 
 53 }
 54 
 55 @Data
 56 public class OrderEntity implements Serializable{
 57     private String id;
 58     private String goods;
 59     private Double price;
 60     private Integer count;
 61     
 62     public OrderEntity(String id,String goods,Double price,Integer count){
 63         this.id=id;
 64         this.goods=goods;
 65         this.price=price;
 66         this.count=count;
 67     }
 68     
 69     public OrderEntity(){}
 70     
 71     public String getId() {
 72         return id;
 73     }
 74     public void setId(String id) {
 75         this.id = id;
 76     }
 77 
 78     public String getGoods() {
 79         return goods;
 80     }
 81 
 82     public void setGoodsId(String goods) {
 83         this.goods = goods;
 84     }
 85 
 86     public Integer getCount() {
 87         return count;
 88     }
 89 
 90     public void setCount(Integer count) {
 91         this.count = count;
 92     }
 93 
 94     public Double getPrice() {
 95         return price;
 96     }
 97 
 98     public void setPrice(Double price) {
 99         this.price = price;
100     }
101 }
102 
103 @Component
104 public class DataSource {
105     //虛擬數據
106     private static List<OrderEntity> list=new ArrayList<OrderEntity>(
107             Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
108                           new OrderEntity("002","Huwei P30 Plus",5400.00,1),
109                           ......));
110     public DataSource(){
111     }
112     
113     public List<OrderEntity> getOrderList(){
114         return list;
115     }
116     
117     //根據Id獲取對應order
118     public OrderEntity getOrder(String id){
119         for(OrderEntity order:list){
120             if(order.getId()==id)
121                 return order;
122         }
123         return null;
124     }
125 }
126 
127 @Component
128 public class MyReturnCallback implements ReturnCallback {
129 
130     @Override
131     public void returnedMessage(Message message, int replyCode, 
132             String replyText, String exchange, String routingKey){
133         //把messageBody反序列化爲 OrderEntity對象
134         OrderEntity order=convertToOrder(message.getBody());
135         //顯示錯誤緣由
136         System.out.println("-------------ReturnCallback!------------\n"
137             +" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode)
138             +" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId()
139             +" Goods:"+order.getGoods()+" Count:"+order.getCount().toString()
140             +" Price:"+order.getPrice()+" ");
141     }
142     
143     //把byte[]反序列化爲 OrderEntity對象
144     private OrderEntity convertToOrder(byte[] bytes){
145         OrderEntity order=null;
146         ByteArrayInputStream bis = new ByteArrayInputStream (bytes);        
147         ObjectInputStream ois;
148         try {
149             ois = new ObjectInputStream (bis);
150             Object obj = ois.readObject();
151             order=(OrderEntity)obj;
152             ois.close();   
153             bis.close(); 
154         } catch (IOException | ClassNotFoundException e) {
155             // TODO 自動生成的 catch 塊
156             e.printStackTrace();
157         }        
158         return order;
159     }
160 }
161 
162 @Controller
163 @RequestMapping("/producer")
164 public class ProducerController {
165     @Autowired
166     private RabbitTemplate template;
167     @Autowired
168     private MyReturnCallback returnCallback;
169     @Autowired
170     private DataSource dataSource;
171  
172     
173     @RequestMapping("/send")
174     public void send() throws InterruptedException, IOException{
175         //把 mandatory 屬性設定爲true
176         template.setMandatory(true);
177         //綁定 ReturnCallback 回調函數
178         template.setReturnCallback(returnCallback);
179  
180         for(OrderEntity order:dataSource.getOrderList()){
181             CorrelationData correlationData=getCorrelationData();
182             template.convertAndSend("directExchange","ErrorKey",order,correlationData);
183         }
184     }
185     
186     private CorrelationData getCorrelationData(){
187         return new CorrelationData(UUID.randomUUID().toString());
188     }
189 }
複製代碼

Consumer 代碼

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }
複製代碼

運行結果:

5、Consumer 消息接收管控

在第四節主要介紹了 Producer 端的隊列發送與監控,它只能管理 Producer 與 Broker Server 之間的通訊,但並不能確認 Consumer 是否能成功接收到隊列,在這節內容將介紹 Consumer 端的隊列接收與監聽。前面幾節裏,Consumer 端都是簡單地直接使用 RabbitListener 對隊列進行監聽,其實 RabbitMQ 已經爲用戶準備了功能更強大的 MessageListenerContainer 容器用於管理 Message ,下面將爲你們介紹。

5.1 AbstractMessageListenerContainer 介紹

AbstractMeessageListenerContainer 虛擬類是 RabbitMQ 封裝好的一個容器,自己並無對消息進行處理,而是把消息的處理方式交給了 MessageListener 。而它的主要功能是實現 MessageListener 的綁定,ApplicationContext 上下文的綁定,ErrorHandler 的錯誤處理方法的綁定、對消息消費的開始、結束等等默認參數進行配置,讓開發人員能夠在容器中對 Consumer 實現統一管理。SimpleMessageListenerContainer、DirectMessageLinstenerCoontainer 都是它的子類,分別應用於不一樣的場景,在下面會再做詳細介紹。

MessageListener 是監聽消息最經常使用 Listener,它只包含了一個方法 void onMessage(Message message),這是消息接收最經常使用的一個方法,開發者只須要實現此方法便可對接收到的 Message 進行處理。

ChannelAwareMessageListener 至關因而 MessageListener的一個擴展,包含了方法 void onMessage(Message message, Channel channel),除了對 Message 進行處理外,還能夠對接收此 Message 的 Channel 進行檢測。

5.2 SimpleMessageListenerContainer 經常使用方法

SimpleMessageListenerContainer 是最經常使用的 MessageListener 容器,它能夠經過下面的方法設置默認消費者數量與最大的消費者數量。下面例子中嘗試把 consurrentConsumers 設置爲3,把maxConcurrentConsumers 設置爲4,並同時監控 direct 模式交換器的 direct.first,direct.second 隊列。

經過截圖能夠看到,系統默認會爲每一個 queue 都建立 3 個 consumers,不一樣的 queue 中的 consumers 是共享相同的 3 個 channel 。



當 Producer 端發送消息時,consumers 的實際數量可根據 maxConcurrentConsumers 的配置限制進行擴展。

Producer 端代碼

1 @Configuration
 2 public class BindingConfig {
 3     public final static String first="direct.first";
 4     public final static String second="direct.second";
 5     public final static String Exchange_NAME="directExchange";
 6     public final static String RoutingKey1="directKey1";
 7     public final static String RoutingKey2="directKey2";
 8     
 9     @Bean
10     public Queue queueFirst(){
11         return new Queue(first);
12     }
13     
14     @Bean
15     public Queue queueSecond(){
16         return new Queue(second);
17     }
18     
19     @Bean
20     public DirectExchange directExchange(){
21         return new DirectExchange(Exchange_NAME);
22     }
23     
24     //利用BindingBuilder綁定Direct與queueFirst
25     @Bean
26     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28     }
29     
30     //利用BindingBuilder綁定Direct與queueSecond
31     @Bean
32     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
33         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34     }   
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39     @Value("${spring.rabbitmq.host}")
40     public String host;
41     
42     @Value("${spring.rabbitmq.port}")
43     public int port;
44     
45     @Value("${spring.rabbitmq.username}")
46     public String username;
47     
48     @Value("${spring.rabbitmq.password}")
49     public String password;
50     
51     @Value("${spring.rabbitmq.virtual-host}")
52     public String virtualHost;
53 
54     @Bean
55     public ConnectionFactory getConnectionFactory(){
56         CachingConnectionFactory factory=new CachingConnectionFactory();
57         factory.setHost(host);
58         factory.setPort(port);
59         factory.setUsername(username);
60         factory.setPassword(password);
61         factory.setVirtualHost(virtualHost);
62         return factory;
63     }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71  
72     @RequestMapping("/send")
73     public void send(HttpServletResponse response) throws InterruptedException, IOException{
74         for(Integer n=0;n<100;n++){
75                 CorrelationData correlationData=getCorrelationData();
76                 template.convertAndSend("directExchange","directKey1", 
77                          "queue1"+" "+n.toString(),correlationData);
78                 template.convertAndSend("directExchange","directKey2"," queue2"+" "+n.toString(),correlationData);            
79                 Thread.currentThread().sleep(30);
80         }
81     }
82 
83     private CorrelationData getCorrelationData(){
84         return new CorrelationData(UUID.randomUUID().toString());
85     }
86 }
複製代碼

Consumer 端代碼:

1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         factory.setHost(host);
 22         factory.setPort(port);
 23         factory.setUsername(username);
 24         factory.setPassword(password);
 25         factory.setVirtualHost(virtualHost);
 26         return factory;
 27     }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32     public final static String first="direct.first";
 33     public final static String second="direct.second";
 34     public final static String Exchange_NAME="directExchange";
 35     public final static String RoutingKey1="directKey1";
 36     public final static String RoutingKey2="directKey2";
 37     
 38     @Bean
 39     public Queue queueFirst(){
 40         return new Queue(first);
 41     }
 42     
 43     @Bean
 44     public Queue queueSecond(){
 45         return new Queue(second);
 46     }
 47     
 48     @Bean
 49     public DirectExchange directExchange(){
 50         return new DirectExchange(Exchange_NAME);
 51     }
 52     
 53     //利用BindingBuilder綁定Direct與queueFirst
 54     @Bean
 55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57     }
 58     
 59     //利用BindingBuilder綁定Direct與queueSecond
 60     @Bean
 61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
 62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63     }   
 64 }
 65 @Configuration
 66 public class SimpleMessListener {
 67     @Autowired
 68     private RabbitTemplate template;
 69     private int index=0;
 70 
 71     @Bean
 72     public SimpleMessageListenerContainer messageContainer(){
 73         SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
 74         container.setConnectionFactory(connectionConfig.getConnectionFactory());
 75         // 綁定Queue1/Queue2
 76         container.setQueueNames("direct.first");      
 77         container.addQueueNames("direct.second");
 78         //設置默認 consumer 數爲3
 79         container.setConcurrentConsumers(3);
 80         //設置最大 consumer 數爲4
 81         container.setMaxConcurrentConsumers(4);
 82         //標記 consumerTag
 83         container.setConsumerTagStrategy(queue ->  "consumer"+(++index));
 84         //綁定MessageListener顯示接收信息
 85         container.setMessageListener(new MessageListener(){
 86             @Override
 87             public void onMessage(Message message) {
 88                 // TODO 自動生成的方法存根
 89                 Thread thread=Thread.currentThread();
 90                 MessageProperties messProp=message.getMessageProperties();
 91                 try {
 92                     System.out.println(" ConsumerTag:"+messProp.getConsumerTag()
 93                             +" ThreadId is:"+thread.getId()+" Queue:"+messProp.getConsumerQueue()
 94                             +" "+new String(message.getBody(),"UTF-8"));
 95                 } catch (UnsupportedEncodingException e) {
 96                     // TODO 自動生成的 catch 塊
 97                     e.printStackTrace();
 98                 }
 99             }
100             
101         });
102         return container;
103     }
104 }
複製代碼

運行結果

5.3 SimpleMessageListenerContainer 的運做原理

在 SimpleMessageListenerContainer 模式中,不管系統監聽多少個 queue 隊列,channel 都是共享的,相似上面的例子,4個 channel 會把接收到不一樣的隊列請求並分發到對應的 consumer 進行處理。這樣作的好處是系統能夠經過 concurrentConsumers、maxConcurrentConsumers 靈活設定當前隊列中消費者的數量,系統能夠跟據實際需求靈活處理。但因爲每一個 channel 都是在固定線程中運行的,一個 channel 要遊走於多個 consumer 當中,這無疑增長了系統在上下文切換中的開銷。下面用系統提供的 ChannelAwareMessageListener 接口,以更直觀的例子說明一下 SimpleMessageListenerContainer 當中 channel、queue、consumer 之間的關係。

Producer 端代碼

1 @Configuration
 2 public class BindingConfig {
 3     public final static String first="direct.first";
 4     public final static String second="direct.second";
 5     public final static String Exchange_NAME="directExchange";
 6     public final static String RoutingKey1="directKey1";
 7     public final static String RoutingKey2="directKey2";
 8     
 9     @Bean
10     public Queue queueFirst(){
11         return new Queue(first);
12     }
13     
14     @Bean
15     public Queue queueSecond(){
16         return new Queue(second);
17     }
18     
19     @Bean
20     public DirectExchange directExchange(){
21         return new DirectExchange(Exchange_NAME);
22     }
23     
24     //利用BindingBuilder綁定Direct與queueFirst
25     @Bean
26     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28     }
29     
30     //利用BindingBuilder綁定Direct與queueSecond
31     @Bean
32     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
33         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34     }   
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39     @Value("${spring.rabbitmq.host}")
40     public String host;
41     
42     @Value("${spring.rabbitmq.port}")
43     public int port;
44     
45     @Value("${spring.rabbitmq.username}")
46     public String username;
47     
48     @Value("${spring.rabbitmq.password}")
49     public String password;
50     
51     @Value("${spring.rabbitmq.virtual-host}")
52     public String virtualHost;
53 
54     @Bean
55     public ConnectionFactory getConnectionFactory(){
56         CachingConnectionFactory factory=new CachingConnectionFactory();
57         factory.setHost(host);
58         factory.setPort(port);
59         factory.setUsername(username);
60         factory.setPassword(password);
61         factory.setVirtualHost(virtualHost);
62         return factory;
63     }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71  
72     @RequestMapping("/send")
73     public void send(HttpServletResponse response) throws InterruptedException, IOException{
74         for(Integer n=0;n<100;n++){
75                 CorrelationData correlationData=getCorrelationData();
76                 template.convertAndSend("directExchange","directKey1",
77                              " queue1"+" "+n.toString(),correlationData);
78                 template.convertAndSend("directExchange","directKey2",
79                              "queue2"+" "+n.toString(),correlationData);            
80                 Thread.currentThread().sleep(30);
81         }
82     }
83 
84     private CorrelationData getCorrelationData(){
85         return new CorrelationData(UUID.randomUUID().toString());
86     }
87 }
複製代碼

Consumer 端代碼

1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         factory.setHost(host);
 22         factory.setPort(port);
 23         factory.setUsername(username);
 24         factory.setPassword(password);
 25         factory.setVirtualHost(virtualHost);
 26         return factory;
 27     }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32     public final static String first="direct.first";
 33     public final static String second="direct.second";
 34     public final static String Exchange_NAME="directExchange";
 35     public final static String RoutingKey1="directKey1";
 36     public final static String RoutingKey2="directKey2";
 37     
 38     @Bean
 39     public Queue queueFirst(){
 40         return new Queue(first);
 41     }
 42     
 43     @Bean
 44     public Queue queueSecond(){
 45         return new Queue(second);
 46     }
 47     
 48     @Bean
 49     public DirectExchange directExchange(){
 50         return new DirectExchange(Exchange_NAME);
 51     }
 52     
 53     //利用BindingBuilder綁定Direct與queueFirst
 54     @Bean
 55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57     }
 58     
 59     //利用BindingBuilder綁定Direct與queueSecond
 60     @Bean
 61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
 62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63     }   
 64 }
 65 @Configuration
 66 public class SimpleMessListener {
 67     @Autowired
 68     private RabbitTemplate template;
 69     @Autowired
 70     private ConnectionConfig connectionConfig;
 71     private int index=0;
 72 
 73     @Bean
 74     public SimpleMessageListenerContainer messageContainer(){
 75         SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
 76         container.setConnectionFactory(connectionConfig.getConnectionFactory());
 77         // 綁定Queue1/Queue2
 78         container.setQueueNames("direct.first");      
 79         container.addQueueNames("direct.second");
 80         //設置默認 consumer 數爲3
 81         container.setConcurrentConsumers(3);
 82         //設置最大 consumer 數爲4
 83         container.setMaxConcurrentConsumers(4);
 84         //標記 consumerTag
 85         container.setConsumerTagStrategy(queue ->  "consumer"+(++index));
 86         //綁定ChannelAwareMessageListener顯示接收信息
 87         container.setChannelAwareMessageListener(new ChannelAwareMessageListener(){
 88             @Override
 89             public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
 90                     throws Exception {
 91                 // TODO 自動生成的方法存根
 92                 // TODO 自動生成的方法存根
 93                 Thread thread=Thread.currentThread();
 94                 System.out.println("Channel:"+channel.getChannelNumber()                            
 95                         +" ThreadId is:"+thread.getId()
 96                         +" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
 97                         +" Queue:"+message.getMessageProperties().getConsumerQueue());
 98                         
 99             }
100             
101         });
102         return container;
103     }
104 }            
複製代碼

運行結果:

觀察運行結果能夠看到:每一個 channel 都在固定的線程中運行,一個 channel 會向不一樣的 consumer 發送隊列信息。瞭解 channel、thread、queue、consumer 之間的關係,會對 SimpleMessageListenerContainer 有更深刻認識。

5.4 DirectMessageListenerContainer

SimpleMessageListenerContainer 是經典的容器,使用 channel 共享,一旦某個 channel 關閉或重啓,意味着每一個隊列 queue 中使用當前 channel 的 consumer 都會受到影響。 有見及此,在 RabbitMQ 2.0 後,系統引入了 DirectMessageListenerContainer ,它容許每一個 consumer 都有各自的對應的 channel 的,channel 只管理負責管理當前 consumer 的通道。這樣令 consumer 運用更靈活,同時線程並無跟 channel 綁定,而是由獨立的線程池進行管理,這是更好地解決了 SimpleMessageListenerContainer 中上下文切換所帶來的資源消耗問題。

下面的例子,咱們嘗試使用把 consumersPerQueue 設置爲 4,並同時監控 direct 模式 exchange 的 direct.first,direct.second 隊列。

從管理界面能夠看到,系統會爲每一個 consumer 都生成一個獨立的 channel 進行管理。


Producer 端代碼

1 @Configuration
 2 public class BindingConfig {
 3     public final static String first="direct.first";
 4     public final static String second="direct.second";
 5     public final static String Exchange_NAME="directExchange";
 6     public final static String RoutingKey1="directKey1";
 7     public final static String RoutingKey2="directKey2";
 8     
 9     @Bean
10     public Queue queueFirst(){
11         return new Queue(first);
12     }
13     
14     @Bean
15     public Queue queueSecond(){
16         return new Queue(second);
17     }
18     
19     @Bean
20     public DirectExchange directExchange(){
21         return new DirectExchange(Exchange_NAME);
22     }
23     
24     //利用BindingBuilder綁定Direct與queueFirst
25     @Bean
26     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28     }
29     
30     //利用BindingBuilder綁定Direct與queueSecond
31     @Bean
32     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
33         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34     }   
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39     @Value("${spring.rabbitmq.host}")
40     public String host;
41     
42     @Value("${spring.rabbitmq.port}")
43     public int port;
44     
45     @Value("${spring.rabbitmq.username}")
46     public String username;
47     
48     @Value("${spring.rabbitmq.password}")
49     public String password;
50     
51     @Value("${spring.rabbitmq.virtual-host}")
52     public String virtualHost;
53 
54     @Bean
55     public ConnectionFactory getConnectionFactory(){
56         CachingConnectionFactory factory=new CachingConnectionFactory();
57         factory.setHost(host);
58         factory.setPort(port);
59         factory.setUsername(username);
60         factory.setPassword(password);
61         factory.setVirtualHost(virtualHost);
62         return factory;
63     }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71  
72     @RequestMapping("/send")
73     public void send(HttpServletResponse response) throws InterruptedException, IOException{
74         for(Integer n=0;n<100;n++){
75                 CorrelationData correlationData=getCorrelationData();
76                 template.convertAndSend("directExchange","directKey1",
77                              " queue1"+" "+n.toString(),correlationData);
78                 template.convertAndSend("directExchange","directKey2",
79                              "queue2"+" "+n.toString(),correlationData);            
80                 Thread.currentThread().sleep(30);
81         }
82     }
83 
84     private CorrelationData getCorrelationData(){
85         return new CorrelationData(UUID.randomUUID().toString());
86     }
87 }
複製代碼

Consumer 端代碼

1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 public class BindingConfig {
32     public final static String first="direct.first";
33     public final static String second="direct.second";
34     public final static String Exchange_NAME="directExchange";
35     public final static String RoutingKey1="directKey1";
36     public final static String RoutingKey2="directKey2";
37     
38     @Bean
39     public Queue queueFirst(){
40         return new Queue(first);
41     }
42     
43     @Bean
44     public Queue queueSecond(){
45         return new Queue(second);
46     }
47     
48     @Bean
49     public DirectExchange directExchange(){
50         return new DirectExchange(Exchange_NAME);
51     }
52     
53     //利用BindingBuilder綁定Direct與queueFirst
54     @Bean
55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
57     }
58     
59     //利用BindingBuilder綁定Direct與queueSecond
60     @Bean
61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
63     }   
64 }
65 
66 @Configuration
67 public class DirectMessListener {
68     @Autowired
69     private ConnectionConfig connectionConfig;
70     @Autowired
71     private RabbitTemplate template;
72     private int index=0;
73     
74     @Bean
75     public DirectMessageListenerContainer messageContainer(){
76         DirectMessageListenerContainer container=new DirectMessageListenerContainer();
77         container.setConnectionFactory(connectionConfig.getConnectionFactory());
78         // 設置每一個隊列的 consumer 數量
79         container.setConsumersPerQueue(4);
80         container.addQueueNames("direct.first");
81         container.addQueueNames("direct.second");
82         container.setConsumerTagStrategy(queue -> "consumer"+(++index));
83         container.setMessageListener(new ChannelAwareMessageListener(){
84             @Override
85             public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
86                     throws Exception {
87                 // TODO 自動生成的方法存根
88                 // TODO 自動生成的方法存根
89                 Thread thread=Thread.currentThread();
90                 
91                 System.out.println("Channel:"+channel.getChannelNumber()                            
92                         +" ThreadId is:"+thread.getId()
93                         +" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
94                         +" Queue:"+message.getMessageProperties().getConsumerQueue());
95               }    
96         });
97         return container;
98     }
99 }
複製代碼

經過運行結果進一步能夠證明,consumer 信息接收是由獨立的線程池進行管理的,並無與 channel 綁定,每一個 consumer 都有本身單獨的 channel,即便 channel 發生問題時,也不會對其餘的 consumer 發生影響,這正是 DirectMessageListenerContainer 的優勝之處。

5.5 Consumer 的信息接收確認方式

在第四節曾經介紹過在 Producer 端利用 ConfirmCallback / ReturnCallback 監控信息的發送,在這節將爲你們在 Consumer 端監控信息的接收。

Consumer 的信息接收確認模式能夠經過 AcknowledgeMode 設定,一共有三種模式:NONE、MANUAL、AUTO,默認是 AUTO 模式。其中 NONE 爲系統確認,MANUAL 是手動確認。

而 AUTO 爲自動模式,系統能夠根據執行狀況自動發送 ack / nack。若是方法未拋出異常,則發送 ack。若是拋出異常 AmqpRejectAndDontRequeueException 顧名思義消息被拒絕且不會從新加入隊列。若是方法拋出非 AmqpRejectAndDontRequeueException 異常,則系統發送 nack 消息重歸隊列。

Channel 消息接收的經常使用方法

AcknowledgeMode 配置爲 MANUAL 後,用戶可經過 Channel 類的 void basicAck(long deliveryTag, boolean multiple) 方法手動確認消息接收是否成功。

若檢測到有異常,可經過void basicReject(long deliveryTag, boolean requeue) 或 void basicNack(long deliveryTag, boolean multiple, boolean requeue) 確認是否從新把消息推送。

經過配置 prefetchCount 可設置 consumer 每次接收到的信息數量,系統默認值爲 250,這表示當 consumer 隊列接收到 250 請求其狀態皆爲 unacked 時,broker server 將暫停向 consumer 發送消息,待消息處理後再繼續。

下面例子中咱們嘗試把 prefetchCount 設置爲 10,即每一個 consumer 單次最多接收到的消息爲 10 條,並把 consumersPerQueue 設置爲 4,而後把 AcknowledgeMode 設置爲 MANUAL,經過手動確認消息接收,一旦發生錯誤,消息從新加入隊列。

Producer 端代碼

1 @Configuration
 2 public class BindingConfig {
 3     public final static String first="direct.first";
 4     public final static String second="direct.second";
 5     public final static String Exchange_NAME="directExchange";
 6     public final static String RoutingKey1="directKey1";
 7     public final static String RoutingKey2="directKey2";
 8     
 9     @Bean
10     public Queue queueFirst(){
11         return new Queue(first);
12     }
13     
14     @Bean
15     public Queue queueSecond(){
16         return new Queue(second);
17     }
18     
19     @Bean
20     public DirectExchange directExchange(){
21         return new DirectExchange(Exchange_NAME);
22     }
23     
24     //利用BindingBuilder綁定Direct與queueFirst
25     @Bean
26     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28     }
29     
30     //利用BindingBuilder綁定Direct與queueSecond
31     @Bean
32     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
33         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34     }   
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39     @Value("${spring.rabbitmq.host}")
40     public String host;
41     
42     @Value("${spring.rabbitmq.port}")
43     public int port;
44     
45     @Value("${spring.rabbitmq.username}")
46     public String username;
47     
48     @Value("${spring.rabbitmq.password}")
49     public String password;
50     
51     @Value("${spring.rabbitmq.virtual-host}")
52     public String virtualHost;
53 
54     @Bean
55     public ConnectionFactory getConnectionFactory(){
56         CachingConnectionFactory factory=new CachingConnectionFactory();
57         factory.setHost(host);
58         factory.setPort(port);
59         factory.setUsername(username);
60         factory.setPassword(password);
61         factory.setVirtualHost(virtualHost);
62         return factory;
63     }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71  
72     @RequestMapping("/send")
73     public void send(HttpServletResponse response) throws InterruptedException, IOException{
74         for(Integer n=0;n<100;n++){
75                 CorrelationData correlationData=getCorrelationData();
76                 template.convertAndSend("directExchange","directKey1",
77                              " queue1"+" "+n.toString(),correlationData);
78                 template.convertAndSend("directExchange","directKey2",
79                              "queue2"+" "+n.toString(),correlationData);            
80         }
81     }
82 
83     private CorrelationData getCorrelationData(){
84         return new CorrelationData(UUID.randomUUID().toString());
85     }
86 }
複製代碼

運行後可看到 Broker Server 每條 queue 會有 100 條數據處於待處理狀態


Consumer 端代碼

1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         factory.setHost(host);
 22         factory.setPort(port);
 23         factory.setUsername(username);
 24         factory.setPassword(password);
 25         factory.setVirtualHost(virtualHost);
 26         return factory;
 27     }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32     public final static String first="direct.first";
 33     public final static String second="direct.second";
 34     public final static String Exchange_NAME="directExchange";
 35     public final static String RoutingKey1="directKey1";
 36     public final static String RoutingKey2="directKey2";
 37     
 38     @Bean
 39     public Queue queueFirst(){
 40         return new Queue(first);
 41     }
 42     
 43     @Bean
 44     public Queue queueSecond(){
 45         return new Queue(second);
 46     }
 47     
 48     @Bean
 49     public DirectExchange directExchange(){
 50         return new DirectExchange(Exchange_NAME);
 51     }
 52     
 53     //利用BindingBuilder綁定Direct與queueFirst
 54     @Bean
 55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57     }
 58     
 59     //利用BindingBuilder綁定Direct與queueSecond
 60     @Bean
 61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
 62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63     }   
 64 }
 65 
 66 @Configuration
 67 public class DirectMessListener {
 68     @Autowired
 69     private ConnectionConfig connectionConfig;
 70     @Autowired
 71     private RabbitTemplate template;
 72     private int index=0;
 73     
 74     @Bean
 75     public DirectMessageListenerContainer messageContainer(){
 76         DirectMessageListenerContainer container=new DirectMessageListenerContainer();
 77         container.setConnectionFactory(connectionConfig.getConnectionFactory());
 78         // 設置每一個隊列的 consumer 數量
 79         container.setConsumersPerQueue(4);
 80         // 設置每一個 consumer 每次的接收的消息數量爲10個
 81         container.setPrefetchCount(10);
 82         // 使用MANUAL進行手動確認      
 83         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 84         container.addQueueNames("direct.first");
 85         container.addQueueNames("direct.second");
 86         container.setConsumerTagStrategy(queue -> "consumer"+(++index));
 87         container.setMessageListener(new ChannelAwareMessageListener(){
 88             @Override
 89             public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
 90                     throws Exception {
 91                 Thread thread=Thread.currentThread();
 92                 MessageProperties prop=message.getMessageProperties();
 93                 try{
 94                     System.out.println("Channel:"+channel.getChannelNumber()                            
 95                             +" ThreadId is:"+thread.getId()
 96                             +" ConsumerTag:"+prop.getConsumerTag()
 97                             +" Queue:"+prop.getConsumerQueue());
 98                     //經過Tag單個確認
 99                     channel.basicAck(prop.getDeliveryTag(), false);
100                 }catch(Exception ex){
101                     //斷定單個接收失敗,從新加入consumer隊列
102                     channel.basicReject(prop.getDeliveryTag(), true);
103                 }
104                 thread.sleep(1000);
105             }    
106         });
107         return container;
108     }
109 }
複製代碼

觀察信息接收狀況,每一個 consumer 一次可處理10條信息,對隊列進行分批處理。


6、死信隊列

死信隊列(Dead-Letter-Exchange) 可被看做是死信交換器。當消息在一個隊列中變成死信後,它能被從新被髮送到特定的交換器中,這個交換器就是DLX ,綁定DLX 的隊列就稱之爲死信隊列。消息變成死信通常是因爲如下幾種狀況:

  • 消息被拒絕,requeue 被設置爲 false, 可經過上一介紹的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成設置 ;
  • 消息過時;
  • 隊列超出最大長度。

其實死信隊列 DLX 也是一個正常的交換器,和通常的交換器沒有什麼區別,咱們能夠用通常創建隊列的方法,創建一個死信隊列。而後創建一個正常的隊列,在正常隊列中加入參數 x-dead-letter-exchange、x-dead-letter-routing-key 與死信隊列進行綁定,完成綁定後在管理界面 Features 選項中 direct.queue.first 會顯示 DLX DLK。這時當被綁定的隊列出現超時,超長,或被拒絕時(注意requeue被設置爲false時,對會激發死信),信息就會流入死信隊列被處理。

具體的例子Producer端:

1 @Configuration 
 2 public class BindingConfig {
 3     public final static String Queue_First="direct.queue.first";
 4     public final static String Exchange_Name="directExchange";
 5     public final static String Routing_Key_First="directKey1";
 6     
 7     @Bean
 8     public Queue queueFirst(){
 9         return new Queue(this.Queue_First);
10     }
11     
12     @Bean
13     public DirectExchange directExchange(){
14         return new DirectExchange(this.Exchange_Name);
15     }
16     
17     @Bean
18     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
19         return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
20     }
21 }
22 
23 @Configuration
24 public class ConnectionConfig {
25     @Value("${spring.rabbitmq.host}")
26     public String host;
27     
28     @Value("${spring.rabbitmq.port}")
29     public int port;
30     
31     @Value("${spring.rabbitmq.username}")
32     public String username;
33     
34     @Value("${spring.rabbitmq.password}")
35     public String password;
36     
37     @Value("${spring.rabbitmq.virtual-host}")
38     public String virtualHost;
39 
40     @Bean
41     public ConnectionFactory getConnectionFactory(){
42         CachingConnectionFactory factory=new CachingConnectionFactory();
43         System.out.println(host);
44         factory.setHost(host);
45         factory.setPort(port);
46         factory.setUsername(username);
47         factory.setPassword(password);
48         factory.setVirtualHost(virtualHost);
49         return factory;
50     }
51 }
52 
53 @Controller
54 @RequestMapping("/producer")
55 public class ProducerController {
56     @Autowired
57     private RabbitTemplate template;
58     
59     @RequestMapping("/send")
60     public void send() {
61         for(int n=0;n<10;n++){            
62             template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! "
63                    +String.valueOf(n),getCorrelationData());
64         }
65     }
66 
67      private CorrelationData getCorrelationData(){
68         return new CorrelationData(UUID.randomUUID().toString());
69     }
70 }
複製代碼

Customer 端

1 @Configuration
  2 public class BindingConfig {
  3     //普通隊列參數
  4     public final static String Queue_First="direct.queue.first";
  5     public final static String Exchange_Name="directExchange";
  6     public final static String Routing_Key_First="directKey1";
  7     //死信隊列參數
  8     public final static String Queue_Dead="direct.queue.dead";
  9     public final static String Exchange_Dead="directDead";
 10     public final static String Routing_Key_Dead="directDeadKey";
 11     
 12     @Bean
 13     public Queue queueFirst(){
 14         Map<String, Object> args=new HashMap<String,Object>();
 15         //聲明當前死信的 Exchange
 16         args.put("x-dead-letter-exchange", this.Exchange_Dead);
 17          //聲明當前隊列的死信路由key
 18         args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
 19         //把死信隊列的參數綁定到當前隊列中
 20         return QueueBuilder.durable(Queue_First).withArguments(args).build();
 21     }
 22     
 23     @Bean
 24     public DirectExchange directExchange(){
 25         return new DirectExchange(this.Exchange_Name);
 26     }
 27     
 28     @Bean
 29     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 30         return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
 31     }
 32     
 33     @Bean
 34     public Queue queueDead(){
 35         return new Queue(this.Queue_Dead);
 36     }
 37     
 38     @Bean
 39     public DirectExchange directExchangeDead(){
 40         return new DirectExchange(this.Exchange_Dead);
 41     }
 42     
 43     @Bean
 44     public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
 45         return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
 46     }
 47 }
 48 
 49 @Configuration
 50 public class ConnectionConfig {
 51     @Value("${spring.rabbitmq.host}")
 52     public String host;
 53     
 54     @Value("${spring.rabbitmq.port}")
 55     public int port;
 56     
 57     @Value("${spring.rabbitmq.username}")
 58     public String username;
 59     
 60     @Value("${spring.rabbitmq.password}")
 61     public String password;
 62     
 63     @Value("${spring.rabbitmq.virtual-host}")
 64     public String virtualHost;
 65 
 66     @Bean
 67     public ConnectionFactory getConnectionFactory(){
 68         CachingConnectionFactory factory=new CachingConnectionFactory();
 69         factory.setHost(host);
 70         factory.setPort(port);
 71         factory.setUsername(username);
 72         factory.setPassword(password);
 73         factory.setVirtualHost(virtualHost);
 74         return factory;
 75     }
 76 }
 77 
 78 @Configuration
 79 public class DirectMessListener {
 80     @Autowired
 81     private ConnectionConfig connectionConfig;
 82     @Autowired
 83     private RabbitTemplate template;
 84     private int index=0,normalIndex=0,deadIndex=0;    
 85     
 86     @Bean
 87     public DirectMessageListenerContainer messageContainer(){
 88         DirectMessageListenerContainer container=new DirectMessageListenerContainer();
 89         container.setConnectionFactory(connectionConfig.getConnectionFactory());
 90         // 設置每一個隊列的 consumer 數量
 91         container.setConsumersPerQueue(4);
 92         // 設置每一個 consumer 每次的接收的消息數量
 93         container.setPrefetchCount(10);
 94         // 使用MANUAL手動確認
 95         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 96         // 監聽隊列
 97         container.addQueueNames(BindingConfig.Queue_First);
 98         container.addQueueNames(BindingConfig.Queue_Dead);
 99         container.setConsumerTagStrategy(queue -> "consumer"+(++index));
100          
101         container.setMessageListener(new ChannelAwareMessageListener(){
102             @Override
103             public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
104                     throws Exception {
105                 MessageProperties prop=message.getMessageProperties();
106                 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
107                     System.out.println("This is a normal queue! "+(++normalIndex));
108                     //把當前的隊列轉送到死信隊列中
109                     channel.basicReject(prop.getDeliveryTag(), false);                
110                 }
111                 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
112                     System.out.println("This is a dead queue! "+(++deadIndex));
113                     //模擬對死信隊列處理
114                     Thread.currentThread().sleep(5000);
115                     .......
116                     //處理完畢
117                     channel.basicAck(prop.getDeliveryTag(), false);
118                 }
119                 
120             }
121         });
122         return container;
123     }
124 }
複製代碼

經過管理界面能夠看,信息會先發送到 direct.queue.first,而後被放進死信隊列做處理。



運行結果


死信隊列最經常使用的場景能夠在訂單支付,流程審批等環節。例如在 京*、淘* 等平臺,當下單成功後,客戶要在必定的時間內完成支付操做,不然訂單被視做無效,這些業務流程就可使用死信隊列來處理。

7、持久化操做

RabbitMq 的持久化操做包含有 Queue 持久化、Message 持久化和 Exchange 持久化三類。

7.1 Queue 的持久化

隊列持久化只須要在 Queue 的構造函數 public Queue(String name, boolean durable) 把 durable 參數置爲 true 就可實現。若是隊列不設置持久化( (durable 默認爲 false), 那麼在RabbitMQ 服務重啓以後,相關隊列的元數據會丟失,此時數據也會丟失。

7.2 Message 持久化

設置了Queue 持久化之後,當 RabbitMQ 服務重啓以後,隊列依然存在,但消息已經消失,可見單單設置隊列的持久化而不設置消息持久化顯得毫無心義,因此一般列隊持久化會與消息持久化共同使用。

在 RabbitMQ 原生態的框架下,須要把信息屬性設置爲 MessageProperties.PERSISTENT TEXT PLAIN 纔會實現消息的持久化。

而在 Spring 框架下,因爲在使用回調函數時須要把 Message 從新返回隊列再進行處理,因此 Message 默認已是持久化的。


7.3 Exchage 的持久化

交換器持久化可經過構造函數 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 參數置爲 true 就可實現,而 autoDelete 則是指在所在消費者都解除訂閱的狀況下自動刪除。若是交換器不設置持久化,那麼在 RabbitMQ 服務重啓以後,相關的交換器元數據會丟失,不過消息不會丟失,只是消息再也不發送到該 Exchange 。對一個長期使用的交換器來講,持久化仍是有其必要性的。

總結

RabbitMQ 發展至今,被愈來愈多的人承認,這和它在易用性、擴展性、可靠性和高可用性等方面的卓著表現是密不可分的。

相比於傳統的 ActiveMQ 和分佈式 Kafka,它具備本身獨有的特色。

但願文章有幫於你們對 RabbitMQ 消息隊列方面有更深刻的瞭解,在不一樣的開發環境中靈活運用。

因爲時間倉促,文章當中有不明確的地方或有錯漏敬請點明。

相關文章
相關標籤/搜索