本文主要討論 RabbitMQ,從3月底接觸一個項目使用了 RabbitMQ,就開始着手學習,主要經過視頻和博客學習了一個月,基本明白了 RabbitMQ 的應用,其它的 MQ 隊列還不清楚,其底層技術還有待學習,如下是我目前的學習心得。java
1.安裝 Erlang
RabbitMQ 是基於 Erlang 語言寫的,因此首先安裝 Erlang,本例是在 Windows 上安裝,也能夠選擇在 Linux 上安裝,機器上沒有虛擬機,直接在 Windows 上操做,建議在 Linux 上安裝。官方下載 Erlang 軟件,我下載最新版本 21.3。安裝過程很簡單,直接 Next 到底。 Linux 安裝自行谷歌。以下圖:web
安裝結束後,設置環境變量,以下圖
測試是否安裝成功
2.安裝 RabbitMQ
在官方下載,選擇最新版本 3.7。安裝過程很簡單,直接 Next 到底。以下圖:spring
測試安裝是否成功,進入安裝目錄 sbin,執行 rabbitmq-plugins enable rabbitmq_management 命令,出現下面界面,證實安裝成功(建議以管理員方式打開 dos)。
執行 rabbitmq-server start 命令,啓動服務。本地登錄並建立用戶,以下圖:數據庫
關於tags標籤的解釋:
一、 超級管理員(administrator)apache
可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。json
二、 監控者(monitoring)windows
可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)bash
三、 策略制定者(policymaker)服務器
可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
四、 普通管理者(management)
僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
五、 其餘
沒法登錄管理控制檯,一般就是普通的生產者和消費者。
4.JAVA 操做RabbitMQ
參考 RabbitMQ 官網,一共分爲6個模式
RabbitMQ 是一個消息代理,實際上,它接收生產者產生的消息,而後將消息傳遞給消費者。在這個過程當中,它能夠路由、緩衝、持久化等,在傳輸過程當中,主要又三部分組成。
生產者:發送消息的一端
通常狀況下,消息生產者、消費者和隊列不在同一臺服務器上,本地作測試,放在一臺服務器上。 測試項目直接建立一個 maven 格式的項目,不必建立網絡格式。新建一個項目,以下圖:
首先準備操做 MQ 的環境
(1): 準備必要的 Pom 文件,導入相應的 jar 包,
1 <?xml version="1.0" encoding="UTF-8"?>
2
3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>com.edu</groupId>
8 <artifactId>rabbitmqdemo</artifactId>
9 <version>1.0</version>
10
11 <name>rabbitmqdemo</name>
12 <!-- FIXME change it to the project's website -->
13 <url>http://www.example.com</url>
14
15 <properties>
16 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
17 <maven.compiler.source>1.7</maven.compiler.source>
18 <maven.compiler.target>1.7</maven.compiler.target>
19 </properties>
20
21 <dependencies>
22 <!--測試包-->
23 <dependency>
24 <groupId>junit</groupId>
25 <artifactId>junit</artifactId>
26 <version>4.11</version>
27 <scope>test</scope>
28 </dependency>
29 <!--mq客戶端-->
30 <dependency>
31 <groupId>com.rabbitmq</groupId>
32 <artifactId>amqp-client</artifactId>
33 <version>4.5.0</version>
34 </dependency>
35 <!--日誌-->
36 <dependency>
37 <groupId>org.slf4j</groupId>
38 <artifactId>slf4j-log4j12</artifactId>
39 <version>1.7.25</version>
40 </dependency>
41 <!--工具包-->
42 <dependency>
43 <groupId>org.apache.commons</groupId>
44 <artifactId>commons-lang3</artifactId>
45 <version>3.3.2</version>
46 </dependency>
47 <!--spring集成-->
48 <dependency>
49 <groupId>org.springframework.amqp</groupId>
50 <artifactId>spring-rabbit</artifactId>
51 <version>1.7.6.RELEASE</version>
52 </dependency>
53 <dependency>
54 <groupId>org.springframework</groupId>
55 <artifactId>spring-test</artifactId>
56 <version>4.3.7.RELEASE</version>
57 </dependency>
58 <dependency>
59 <groupId>junit</groupId>
60 <artifactId>junit</artifactId>
61 <version>RELEASE</version>
62 <scope>compile</scope>
63 </dependency>
64 </dependencies>
65
66 <build>
67 <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
68 <plugins>
69 <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
70 <plugin>
71 <artifactId>maven-clean-plugin</artifactId>
72 <version>3.1.0</version>
73 </plugin>
74 <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
75 <plugin>
76 <artifactId>maven-resources-plugin</artifactId>
77 <version>3.0.2</version>
78 </plugin>
79 <plugin>
80 <artifactId>maven-compiler-plugin</artifactId>
81 <version>3.8.0</version>
82 </plugin>
83 <plugin>
84 <artifactId>maven-surefire-plugin</artifactId>
85 <version>2.22.1</version>
86 </plugin>
87 <plugin>
88 <artifactId>maven-jar-plugin</artifactId>
89 <version>3.0.2</version>
90 </plugin>
91 <plugin>
92 <artifactId>maven-install-plugin</artifactId>
93 <version>2.5.2</version>
94 </plugin>
95 <plugin>
96 <artifactId>maven-deploy-plugin</artifactId>
97 <version>2.8.2</version>
98 </plugin>
99 <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
100 <plugin>
101 <artifactId>maven-site-plugin</artifactId>
102 <version>3.7.1</version>
103 </plugin>
104 <plugin>
105 <artifactId>maven-project-info-reports-plugin</artifactId>
106 <version>3.0.0</version>
107 </plugin>
108 </plugins>
109 </pluginManagement>
110 </build>
111 </project>
(2): 創建日誌配置文件,在 resources 下創建 log4j.properties,便於打印精確的日誌信息
1 log4j.rootLogger=DEBUG,A1
2 log4j.logger.com.edu=DEBUG
3 log4j.logger.org.mybatis=DEBUG
4 log4j.appender.A1=org.apache.log4j.ConsoleAppender
5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
(3): 編寫一個工具類,主要用於鏈接 RabbitMQ
1 package com.edu.util;
2
3
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.ConnectionFactory;
6
7 /**
8 * @ClassName ConnectionUtil
9 * @Deccription 穿件鏈接的工具類
10 * @Author DZ
11 * @Date 2019/5/4 12:27
12 **/
13 public class ConnectionUtil {
14 /**
15 * 建立鏈接工具
16 *
17 * @return
18 * @throws Exception
19 */
20 public static Connection getConnection() throws Exception {
21 ConnectionFactory connectionFactory = new ConnectionFactory();
22 connectionFactory.setHost("127.0.0.1");//MQ的服務器
23 connectionFactory.setPort(5672);//默認端口號
24 connectionFactory.setUsername("test");
25 connectionFactory.setPassword("test");
26 connectionFactory.setVirtualHost("/test");
27 return connectionFactory.newConnection();
28 }
29 }
項目整體圖以下:
4.1.Hello World模式
此模式很是簡單,一個生產者對應一個消費者
首先咱們製造一個消息生產者,併發送消息:
1 package com.edu.hello;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription 建立發送者
10 * @Author DZ
11 * @Date 2019/5/4 12:45
12 **/
13 public class Sender {
14 private final static String QUEUE = "testhello"; //隊列的名字
15
16 public static void main(String[] srgs) throws Exception {
17 //獲取鏈接
18 Connection connection = ConnectionUtil.getConnection();
19 //建立鏈接
20 Channel channel = connection.createChannel();
21 //聲明隊列
22 //參數1:隊列的名字
23 //參數2:是否持久化隊列,咱們的隊列存在內存中,若是mq重啓則丟失。若是爲ture,則保存在erlang的數據庫中,重啓,依舊保存
24 //參數3:是否排外,咱們鏈接關閉後是否自動刪除隊列,是否私有當前隊列,若是私有,其餘隊列不能訪問
25 //參數4:是否自動刪除
26 //參數5:咱們傳入的其餘參數
27 channel.queueDeclare(QUEUE, false, false, false, null);
28 //發送內容
29 channel.basicPublish("", QUEUE, null, "要發送的消息".getBytes());
30 //關閉鏈接
31 channel.close();
32 connection.close();
33 }
34 }
定義一個消息接受者
1 package com.edu.hello;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6 import com.rabbitmq.client.QueueingConsumer;
7
8 /**
9 * @ClassName Recver
10 * @Deccription 消息接受者
11 * @Author DZ
12 * @Date 2019/5/4 12:58
13 **/
14 public class Recver {
15 private final static String QUEUE = "testhello";//消息隊列的名稱
16
17 public static void main(String[] args) throws Exception {
18 Connection connection = ConnectionUtil.getConnection();
19 Channel channel = connection.createChannel();
20 channel.queueDeclare(QUEUE, false, false, false, null);
21 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
22 //接受消息,參數2表示自動確認消息
23 channel.basicConsume(QUEUE, true, queueingConsumer);
24 while (true) {
25 //獲取消息
26 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//若是沒有消息就等待,有消息就獲取消息,並銷燬,是一次性的
27 String message = new String(delivery.getBody());
28 System.out.println(message);
29 }
30 }
31 }
此種模式屬於「點對點」模式,一個生產者、一個隊列、一個消費者,能夠運用在聊天室(實際上真實的聊天室比這複雜不少,雖然是「點對點」模式,可是並非一個生產者,一個隊列,一個消費者)
4.2.work queues
一個生產者對應多個消費者,可是隻有一個消費者得到消息
定義消息製造者:
1 package com.edu.work;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription 建立發送者
10 * @Author DZ
11 * @Date 2019/5/4 12:45
12 **/
13 public class Sender {
14 private final static String QUEUE = "testhellowork"; //隊列的名字
15
16 public static void main(String[] srgs) throws Exception {
17 //獲取鏈接
18 Connection connection = ConnectionUtil.getConnection();
19 //建立鏈接
20 Channel channel = connection.createChannel();
21 //聲明隊列
22 //參數1:隊列的名字
23 //參數2:是否持久化隊列,咱們的隊列存在內存中,若是mq重啓則丟失。若是爲ture,則保存在erlang的數據庫中,重啓,依舊保存
24 //參數3:是否排外,咱們鏈接關閉後是否自動刪除隊列,是否私有當前隊列,若是私有,其餘隊列不能訪問
25 //參數4:是否自動刪除
26 //參數5:咱們傳入的其餘參數
27 channel.queueDeclare(QUEUE, false, false, false, null);
28 //發送內容
29 for (int i = 0; i < 100; i++) {
30 channel.basicPublish("", QUEUE, null, ("要發送的消息" + i).getBytes());
31 }
32 //關閉鏈接
33 channel.close();
34 connection.close();
35 }
36 }
定義2個消息消費者
1 package com.edu.work;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7 import java.util.Queue;
8
9 /**
10 * @ClassName Recver1
11 * @Deccription 消息接受者
12 * @Author DZ
13 * @Date 2019/5/4 12:58
14 **/
15 public class Recver1 {
16 private final static String QUEUE = "testhellowork";//消息隊列的名稱
17
18 public static void main(String[] args) throws Exception {
19 Connection connection = ConnectionUtil.getConnection();
20 final Channel channel = connection.createChannel();
21 channel.queueDeclare(QUEUE, false, false, false, null);
22 //channel.basicQos(1);//告訴服務器,當前消息沒有確認以前,不要發送新消息,合理自動分配資源
23 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
24 @Override
25 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
26 //收到消息時候調用
27 System.out.println("消費者1收到的消息:" + new String(body));
28 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
29 //確認消息
30 //參數2:false爲確認收到消息,ture爲拒絕收到消息
31 channel.basicAck(envelope.getDeliveryTag(), false);
32 }
33 };
34 //註冊消費者
35 // 參數2:手動確認,咱們收到消息後,須要手動確認,告訴服務器,咱們收到消息了
36 channel.basicConsume(QUEUE, false, defaultConsumer);
37 }
38 }
1 package com.edu.work;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription 消息接受者
11 * @Author DZ
12 * @Date 2019/5/4 12:58
13 **/
14 public class Recver2 {
15 private final static String QUEUE = "testhellowork";//消息隊列的名稱
16
17 public static void main(String[] args) throws Exception {
18 Connection connection = ConnectionUtil.getConnection();
19 final Channel channel = connection.createChannel();
20 channel.queueDeclare(QUEUE, false, false, false, null);
21 //channel.basicQos(1);
22 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
23 @Override
24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25 //收到消息時候調用
26 System.out.println("消費者2收到的消息:" + new String(body));
27 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
28 //確認消息
29 //參數2:false爲確認收到消息,ture爲拒絕收到消息
30 channel.basicAck(envelope.getDeliveryTag(), false);
31 }
32 };
33 //註冊消費者
34 // 參數2:手動確認,咱們收到消息後,須要手動確認,告訴服務器,咱們收到消息了
35 channel.basicConsume(QUEUE, false, defaultConsumer);
36 }
37 }
這種模式是最簡單的 work 模式,消息發送者,循環發送了100次消息,打印結果以下:
能夠看出,消息消費者消費到的消息是替換的,即一個消息只被消費了一次,且兩個消費者各消費了50條消息。這裏有個弊端,消息消費者發佈消息的時候,不管消費者的消費能力如何(電腦的內存等硬件),消息只會均勻分佈給各個消費者(能夠給2個消費者 sleep 下,結果仍是這樣)。有沒有什麼方式可讓消息自動分配(按照電腦的硬件,能者多勞),答案是能夠的,只須要增長 channel.basicQos(1);
此方案能夠用來進行負載均衡,搶紅包等場景
4.3.public模式
一個消費者將消息首先發送到交換器,交換器綁定到多個隊列,而後被監聽該隊列的消費者所接收並消費。X 表示交換器,在 RabbitMQ 中,交換器主要有四種類型: direct、fanout、topic、headers,這裏的交換器是 fanout,其它類型的交換機自行谷歌,主要區別是交換機的匹配方式發生了變化。
定義消息發佈者
1 package com.edu.publish;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 14:43
12 **/
13 public class Sender {
14 private final static String EXCHANGE_NAME = "testexchange";//定義交換機名字
15
16 public static void main(String[] args) throws Exception {
17 Connection connection = ConnectionUtil.getConnection();
18 Channel channel = connection.createChannel();
19 //聲明交換機
20 //定義一個交換機,類型爲fanout,也就是發佈訂閱者模式
21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22 //發佈訂閱模式,由於消息是先發布到交換機中,而交換機是沒有保存功能的,因此若是沒有消費者,消息會丟失
23 channel.basicPublish(EXCHANGE_NAME, "", null, "發佈訂閱模式的消息".getBytes());
24 channel.close();
25 connection.close();
26 }
27 }
定義2個消息消費者
1 package com.edu.publish;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 **/
14 public class Recver1 {
15 //定義交換機
16 private final static String EXCHANGE_NAME = "testexchange";
17 private final static String QUEUE = "testpubqueue1";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false, false, false, null);
23 //綁定隊列到交換機
24 channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25 channel.basicQos(1);
26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27 @Override
28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
30 System.out.println("消費者1:" + new String(body));
31 channel.basicAck(envelope.getDeliveryTag(), false);
32 }
33 };
34 channel.basicConsume(QUEUE, false, defaultConsumer);
35 }
36 }
1 package com.edu.publish;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 **/
14 public class Recver2 {
15 //定義交換機
16 private final static String EXCHANGE_NAME = "testexchange";
17 private final static String QUEUE = "testpubqueue2";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false, false, false, null);
23 //綁定隊列到交換機
24 channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25 channel.basicQos(1);
26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27 @Override
28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
30 System.out.println("消費者2:" + new String(body));
31 channel.basicAck(envelope.getDeliveryTag(), false);
32 }
33 };
34 channel.basicConsume(QUEUE, false, defaultConsumer);
35 }
36 }
消費者1 和消費者2 都監聽了被同一個交換器綁定的隊列,所以消息被同時消費到了。若是消息發送到沒有隊列綁定的交換器時,消息將丟失,由於交換器沒有存儲消息的能力,消息只能存儲在隊列中。
應用場景:好比一個商城系統須要在管理員上傳商品新的圖片時,前臺系統必須更新圖片,日誌系統必須記錄相應的日誌,那麼就能夠將兩個隊列綁定到圖片上傳交換器上,一個用於前臺系統更新圖片,另外一個用於日誌系統記錄日誌。
4.4.routing
生產者將消息發送到 direct 交換器,在綁定隊列和交換器的時候有一個路由 key,生產者發送的消息會指定一個路由 key,那麼消息只會發送到相應 key 相同的隊列,接着監聽該隊列的消費者消費消息。
定義消息發佈者
1 package com.edu.route;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 15:05
12 **/
13 public class Sender {
14 private final static String EXCANGE_NAME = "testroute";
15
16 public static void main(String[] args) throws Exception {
17 Connection connection = ConnectionUtil.getConnection();
18 Channel channel = connection.createChannel();
19 //定義路由格式的交換機
20 channel.exchangeDeclare(EXCANGE_NAME, "direct");
21 channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes());
22 channel.close();
23 connection.close();
24 }
25 }
1 package com.edu.route;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 **/
14 public class Recver1 {
15 //定義交換機
16 private final static String EXCHANGE_NAME = "testroute";
17 private final static String QUEUE = "testroute1queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false, false, false, null);
23 //綁定隊列到交換機
24 //參數3:綁定到交換機指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26 //若是須要綁定多個路由,再綁定一次便可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key2");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33 System.out.println("消費者1:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false);
35 }
36 };
37 channel.basicConsume(QUEUE, false, defaultConsumer);
38 }
39 }
1 package com.edu.route;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 **/
14 public class Recver2 {
15 //定義交換機
16 private final static String EXCHANGE_NAME = "testroute";
17 private final static String QUEUE = "testroute2queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false, false, false, null);
23 //綁定隊列到交換機
24 //參數3:綁定到交換機指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26 //若是須要綁定多個路由,再綁定一次便可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key3");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33 System.out.println("消費者2:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false);
35 }
36 };
37 channel.basicConsume(QUEUE, false, defaultConsumer);
38 }
39 }
應用場景:利用消費者可以有選擇性的接收消息的特性,好比咱們商城系統的後臺管理系統對於商品進行修改、刪除、新增操做都須要更新前臺系統的界面展現,而查詢操做確不須要,那麼這兩個隊列分開接收消息就比較好。
4.5.Topic
上面的路由模式是根據路由key進行完整的匹配(徹底相等才發送消息),這裏的通配符模式通俗的來說就是模糊匹配。符號 「#」 表示匹配一個或多個詞,符號 「*」 表示匹配一個詞。實際上 Topic 模式是 routing 模式的擴展
1 package com.edu.topic;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 15:19
12 **/
13 public class Sender {
14 private final static String EXCANGE_NAME = "testtopexchange";
15
16 public static void main(String[] args) throws Exception {
17 Connection connection = ConnectionUtil.getConnection();
18 Channel channel = connection.createChannel();
19 channel.exchangeDeclare(EXCANGE_NAME, "topic");
20 channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息發送者:".getBytes());
21 channel.close();
22 connection.close();
23 }
24 }
1 package com.edu.topic;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 **/
14 public class Recver1 {
15 //定義交換機
16 private final static String EXCHANGE_NAME = "testtopexchange";
17 private final static String QUEUE = "testtopic1queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false, false, false, null);
23 //綁定隊列到交換機
24 //參數3:綁定到交換機指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26 //若是須要綁定多個路由,再綁定一次便可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33 System.out.println("消費者1:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false);
35 }
36 };
37 channel.basicConsume(QUEUE, false, defaultConsumer);
38 }
39 }
1 package com.edu.topic;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 **/
14 public class Recver2 {
15 //定義交換機
16 private final static String EXCHANGE_NAME = "testtopexchange";
17 private final static String QUEUE = "testtopic2queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false, false, false, null);
23 //綁定隊列到交換機
24 //參數3:綁定到交換機指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26 //若是須要綁定多個路由,再綁定一次便可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33 System.out.println("消費者2:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false);
35 }
36 };
37 channel.basicConsume(QUEUE, false, defaultConsumer);
38 }
39 }
第六種模式是將上述的模式集成其它的框架,進行遠程訪問,這裏咱們將集成 Spring 實現 RCP 遠程模式的使用
5.Spring 集成 RabbitMQ
5.1.自動集成 Spring
編寫spring的配置,此配置文件的目的是將 Spring 與 RabbitMQ 進行整合,實際上就是將 MQ 的相關信息(鏈接,隊列,交換機……)經過XML配置的方式實現
1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
4 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
5 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
6 http://www.springframework.org/schema/beans
7 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
8 <!--定義鏈接工廠-->
9 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
10 virtual-host="/test"/>
11 <!--
12 定義模板
13 第三個參數,決定消息發送到哪裏,若是爲exchange,則發送到交換機;若是爲queue,則發送到隊列
14 -->
15 <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
16 <rabbit:admin connection-factory="connectionFactory"/>
17 <!--定義隊列-->
18 <rabbit:queue name="myQueue" auto-declare="true"/>
19 <!--定義交換機-->
20 <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
21 <!--將消息綁定到交換機-->
22 <rabbit:bindings>
23 <rabbit:binding queue="myQueue">
24
25 </rabbit:binding>
26 </rabbit:bindings>
27 </rabbit:fanout-exchange>
28 <!--定義監聽器,收到消息會執行-->
29 <rabbit:listener-container connection-factory="connectionFactory">
30 <!-- 定義監聽的類和方法-->
31 <rabbit:listener ref="consumer" method="test" queue-names="myQueue"/>
32 </rabbit:listener-container>
33 <!--定義消費者-->
34 <bean id="consumer" class="com.edu.spring.MyConsumer"/>
35
36 </beans>
生產者:
1 package com.edu.spring;
2
3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
4 import org.springframework.context.ApplicationContext;
5 import org.springframework.context.support.ClassPathXmlApplicationContext;
6
7 /**
8 * @ClassName SpringTest
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 18:40
12 **/
13 public class SpringTest {
14 public static void main(String[] args) throws Exception {
15 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
16 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
17 rabbitTemplate.convertAndSend("Spring的消息");
18 ((ClassPathXmlApplicationContext) applicationContext).destroy();
19 }
20 }
消費者
1 package com.edu.spring;
2
3 /**
4 * @ClassName MyConsumer
5 * @Deccription TODO
6 * @Author DZ
7 * @Date 2019/5/4 18:35
8 **/
9 public class MyConsumer {
10 /*用於接收消息*/
11 public void test(String message) {
12 System.err.println(message);
13 }
14 }
集成Spring主要是在xml中實現了隊列和交換機的建立。
最好能理解上面的圖。理解後,之後寫相關的代碼,直接去網上 copy 一份配置文件,而後根據本身項目的狀況進行修改。若是不能理解,就不知道如何修改出現錯誤後不知道錯誤出如今什麼地方。
5.2.手動模式
手動模式,主要增長MQ的回調操做,MQ消息失敗或者成功就有相應的回調信息,加強系統的健壯性,一旦產生異常,很快就能定位到異常的位置,因此在實際開發中,通常都這種方式
建立xml配置文件
1 <beans xmlns="http://www.springframework.org/schema/beans"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
4 xmlns:context="http://www.springframework.org/schema/context"
5 xsi:schemaLocation="http://www.springframework.org/schema/rabbit
6 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
7 http://www.springframework.org/schema/beans
8 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
9 http://www.springframework.org/schema/context
10 http://www.springframework.org/schema/context/spring-context-4.3.xsd">
11 <context:component-scan base-package="com.edu.spring2"/>
12 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
13
14 <!--
15 定義鏈接工廠
16 publisher-confirms爲ture,確認失敗等回調纔會執行
17 -->
18 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
19 virtual-host="/test" publisher-confirms="true"/>
20
21 <rabbit:admin connection-factory="connectionFactory"/>
22 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"
23 return-callback="returnCallBackListener"
24 mandatory="true"/>
25 <!--定義隊列-->
26 <rabbit:queue name="myQueue" auto-declare="true"/>
27 <!--定義交換機-->
28 <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX">
29 <!--將消息綁定到交換機-->
30 <rabbit:bindings>
31 <rabbit:binding queue="myQueue">
32
33 </rabbit:binding>
34 </rabbit:bindings>
35 </rabbit:direct-exchange>
36 <!--定義監聽器,收到消息會執行-->
37 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
38 <!-- 定義監聽的類和方法-->
39 <rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/>
40 </rabbit:listener-container>
41
42 </beans>
建立回調監聽函數
1 package com.edu.spring2;
2
3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
4 import org.springframework.amqp.rabbit.support.CorrelationData;
5 import org.springframework.stereotype.Component;
6
7 /**
8 * @ClassName ConfirmCallBackListener
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 22:26
12 **/
13 @Component("confirmCallBackListener")
14 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
15
16 @Override
17 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
18 System.out.println("確認回調 ack==" + ack + "回調緣由==" + cause);
19 }
20 }
1 package com.edu.spring2;
2
3 import com.rabbitmq.client.Channel;
4 import org.springframework.amqp.core.Message;
5 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
6 import org.springframework.stereotype.Component;
7
8 /**
9 * @ClassName ReceiveConfirmTestListener
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 22:24
13 **/
14 @Component("receiveConfirmTestListener")
15 public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
16 /**
17 * 收到消息時,執行的監聽
18 *
19 * @param message
20 * @param channel
21 * @throws Exception
22 */
23 @Override
24 public void onMessage(Message message, Channel channel) throws Exception {
25 System.out.println(("消費者收到了消息" + message));
26 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
27 }
28 }
1 package com.edu.spring2;
2
3 import org.springframework.amqp.core.Message;
4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
5 import org.springframework.stereotype.Component;
6
7 /**
8 * @ClassName ReturnCallBackListener
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 22:28
12 **/
13 @Component("returnCallBackListener")
14 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
15 @Override
16 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
17 System.out.println("失敗回調" + message);
18 }
19 }
回調函數的配置來自 XML
建立發送消息的工具類
1 package com.edu.spring2;
2
3 import org.springframework.amqp.core.AmqpTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.stereotype.Component;
6
7 /**
8 * @ClassName PublicUtil
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 22:30
12 **/
13 @Component("publicUtil")
14 public class PublicUtil {
15 @Autowired
16 private AmqpTemplate amqpTemplate;
17
18 public void send(String excange, String routingkey, Object message) {
19 amqpTemplate.convertAndSend(excange, routingkey, message);
20 }
21 }
建立測試類
1 package com.edu.spring2;
2
3 import org.junit.Test;
4 import org.junit.runner.RunWith;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.test.context.ContextConfiguration;
7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
8
9 /**
10 * @ClassName TestMain
11 * @Deccription TODO
12 * @Author DZ
13 * @Date 2019/5/4 22:32
14 **/
15 @RunWith(SpringJUnit4ClassRunner.class)
16 @ContextConfiguration(locations = {"classpath:applicationContext2.xml"})
17 public class TestMain {
18 @Autowired
19 private PublicUtil publicUtil;
20 private static String exChange = "DIRECT_EX";//交換機
21 private static String queue = "myQueue";
22
23 /**
24 * exChange和queue均正確
25 * confirm會執行,ack = ture
26 * 消息正常接收(接收消息確認方法正常執行)
27 */
28 @Test
29 public void test1() throws Exception {
30 publicUtil.send(exChange, queue, "測試1,隊列和交換機均正確");
31 }
32 /**
33 * exChange錯誤,queue正確
34 * confirm執行,ack=false
35 * 消息沒法接收(接收消息確認方法不能執行)
36 */
37 @Test
38 public void test2() throws Exception {
39 publicUtil.send(exChange + "1", queue, "測試2,隊列正確,交換機錯誤");
40 }
41 /**
42 * exChange正常,queue錯誤
43 * return執行
44 * confirm執行,ack=ture
45 */
46 @Test
47 public void test3() throws Exception {
48 publicUtil.send(exChange, queue + "1", "測試2,隊列錯誤,交換機正確");
49 }
50 /**
51 * exChange錯誤,queue錯誤
52 * confirm執行,ack=false
53 */
54 @Test
55 public void test4() throws Exception {
56 publicUtil.send(exChange + "1", queue + "1", "測試2,隊列錯誤,交換機錯誤");
57 }
58 }
測試結果以下:
-
test1:exChange和queue均正確
- confirm會執行,ack=ture;能正常收到消息(接收消息的方法正常執行)
-
test2:exChange錯誤,queue正確
confirm執行,ack=false;不能正常接收到消息
confirm執行,ack=ture;return執行;不能接收到消息
confirm執行,ack=false;不能接收消息
上述結論及代碼以下圖:
根據上述的測試結果,咱們能夠根據回調函數的返回結果,查看MQ的錯誤出如今那裏。根據上述結論,咱們能夠對3個回調函數作以下處理:
-
類 ReceiveConfirmTestListener 中的onMessage方法主要用於接收從 RabbitMQ 推送過來的消息,並對消息作相應的邏輯處理
-
類 ConfirmCallBackListener 中的 confirm 方法主要用於檢查交換機(exChange),當 ack=false,交換機可能錯誤
-
類 ReturnCallBackListener 中的 returnedMessage 方法用於檢查隊列(queue),當此方法執行時,隊列可能錯誤
因此3個相應的方法能夠作以下調整:
實際上,在真實項目中,上面3個方法也是按照這3個邏輯進行設計的。固然這3個方法中還能夠加入更多的日誌消息,和邏輯處理業務。
6.參考
https://blog.csdn.net/liu911025/article/details/80460182
https://blog.csdn.net/lyhkmm/article/details/78775369
https://blog.csdn.net/vbirdbest/article/details/78670550
https://blog.csdn.net/vbirdbest/article/details/78670550
https://www.rabbitmq.com/getstarted.html