建議先了解爲何項目要使用 MQ 消息隊列,MQ 消息隊列有什麼優勢,若是在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能作優化的同時,同時增長了系統的複雜性也維護難易度;其次,須要瞭解各類常見的 MQ 消息隊列有什麼區別,以便在相同的成本下選擇一種最合適本系統的技術。html
本文主要討論 RabbitMQ,從3月底接觸一個項目使用了 RabbitMQ,就開始着手學習,主要經過視頻和博客學習了一個月,基本明白了 RabbitMQ 的應用,其它的 MQ 隊列還不清楚,其底層技術還有待學習,如下是我目前的學習心得。java
RabbitMQ 是基於 Erlang 語言寫的,因此首先安裝 Erlang,本例是在 Windows 上安裝,也能夠選擇在 Linux 上安裝,機器上沒有虛擬機,直接在 Windows 上操做,建議在 Linux 上安裝。官方下載 Erlang 軟件,我下載最新版本 21.3。安裝過程很簡單,直接 Next 到底。 Linux 安裝自行谷歌。以下圖:spring
安裝結束後,設置環境變量,以下圖 測試是否安裝成功在官方下載,選擇最新版本 3.7。安裝過程很簡單,直接 Next 到底。以下圖:數據庫
測試安裝是否成功,進入安裝目錄 sbin,執行 rabbitmq-plugins enable rabbitmq_management 命令,出現下面界面,證實安裝成功(建議以管理員方式打開 dos)。執行 rabbitmq-server start 命令,啓動服務。本地登錄並建立用戶,以下圖:apache
關於tags標籤的解釋:一、 超級管理員(administrator)json
可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。windows
二、 監控者(monitoring)bash
可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)服務器
三、 策略制定者(policymaker)網絡
可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
四、 普通管理者(management)
僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
五、 其餘
沒法登錄管理控制檯,一般就是普通的生產者和消費者。
參考 RabbitMQ 官網,一共分爲6個模式
RabbitMQ 是一個消息代理,實際上,它接收生產者產生的消息,而後將消息傳遞給消費者。在這個過程當中,它能夠路由、緩衝、持久化等,在傳輸過程當中,主要又三部分組成。生產者:發送消息的一端
隊列:它活動在 RabbitMQ 服務器中,消息存儲的地方,隊列本質上是一個緩衝對象,因此存儲的消息不受限制 消費者:消息接收端 通常狀況下,消息生產者、消費者和隊列不在同一臺服務器上,本地作測試,放在一臺服務器上。 測試項目直接建立一個 maven 格式的項目,不必建立網絡格式。新建一個項目,以下圖: 首先準備操做 MQ 的環境(1): 準備必要的 Pom 文件,導入相應的 jar 包,
<!--mq客戶端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.5.0</version>
</dependency>
<!--日誌-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!--工具包-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<!--spring集成-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.3.7.RELEASE</version>
</dependency>
複製代碼
(2): 創建日誌配置文件,在 resources 下創建 log4j.properties,便於打印精確的日誌信息
log4j.rootLogger=DEBUG,A1
log4j.logger.org.mybatis=DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
複製代碼
(3): 編寫一個工具類,主要用於鏈接 RabbitMQ
package com.edu.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName ConnectionUtil
* @Deccription 穿件鏈接的工具類
* @Author DZ
* @Date 2019/5/4 12:27
**/
public class ConnectionUtil {
/**
* 建立鏈接工具
*
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");//MQ的服務器
connectionFactory.setPort(5672);//默認端口號
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/test");
return connectionFactory.newConnection();
}
}
複製代碼
項目整體圖以下:
此模式很是簡單,一個生產者對應一個消費者
首先咱們製造一個消息生產者,併發送消息:package com.edu.hello;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName Sender
* @Deccription 建立發送者
* @Author DZ
* @Date 2019/5/4 12:45
**/
public class Sender {
private final static String QUEUE = "testhello"; //隊列的名字
public static void main(String[] srgs) throws Exception {
//獲取鏈接
Connection connection = ConnectionUtil.getConnection();
//建立鏈接
Channel channel = connection.createChannel();
//聲明隊列
//參數1:隊列的名字
//參數2:是否持久化隊列,咱們的隊列存在內存中,若是mq重啓則丟失。若是爲ture,則保存在erlang的數據庫中,重啓,依舊保存
//參數3:是否排外,咱們鏈接關閉後是否自動刪除隊列,是否私有當前隊列,若是私有,其餘隊列不能訪問
//參數4:是否自動刪除
//參數5:咱們傳入的其餘參數
channel.queueDeclare(QUEUE, false, false, false, null);
//發送內容
channel.basicPublish("", QUEUE, null, "要發送的消息".getBytes());
//關閉鏈接
channel.close();
connection.close();
}
}
複製代碼
定義一個消息接受者
package com.edu.hello;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* @ClassName Recver
* @Deccription 消息接受者
* @Author DZ
* @Date 2019/5/4 12:58
**/
public class Recver {
private final static String QUEUE = "testhello";//消息隊列的名稱
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//接受消息,參數2表示自動確認消息
channel.basicConsume(QUEUE, true, queueingConsumer);
while (true) {
//獲取消息
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//若是沒有消息就等待,有消息就獲取消息,並銷燬,是一次性的
String message = new String(delivery.getBody());
System.out.println(message);
}
}
}
複製代碼
此種模式屬於「點對點」模式,一個生產者、一個隊列、一個消費者,能夠運用在聊天室(實際上真實的聊天室比這複雜不少,雖然是「點對點」模式,可是並非一個生產者,一個隊列,一個消費者)
定義消息製造者:
package com.edu.work;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName Sender
* @Deccription 建立發送者
* @Author DZ
* @Date 2019/5/4 12:45
**/
public class Sender {
private final static String QUEUE = "testhellowork"; //隊列的名字
public static void main(String[] srgs) throws Exception {
//獲取鏈接
Connection connection = ConnectionUtil.getConnection();
//建立鏈接
Channel channel = connection.createChannel();
//聲明隊列
//參數1:隊列的名字
//參數2:是否持久化隊列,咱們的隊列存在內存中,若是mq重啓則丟失。若是爲ture,則保存在erlang的數據庫中,重啓,依舊保存
//參數3:是否排外,咱們鏈接關閉後是否自動刪除隊列,是否私有當前隊列,若是私有,其餘隊列不能訪問
//參數4:是否自動刪除
//參數5:咱們傳入的其餘參數
channel.queueDeclare(QUEUE, false, false, false, null);
//發送內容
for (int i = 0; i < 100; i++) {
channel.basicPublish("", QUEUE, null, ("要發送的消息" + i).getBytes());
}
//關閉鏈接
channel.close();
connection.close();
}
}
複製代碼
定義2個消息消費者
package com.edu.work;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Queue;
/**
* @ClassName Recver1
* @Deccription 消息接受者
* @Author DZ
* @Date 2019/5/4 12:58
**/
public class Recver1 {
private final static String QUEUE = "testhellowork";//消息隊列的名稱
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//channel.basicQos(1);//告訴服務器,當前消息沒有確認以前,不要發送新消息,合理自動分配資源
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//收到消息時候調用
System.out.println("消費者1收到的消息:" + new String(body));
/*super.handleDelivery(consumerTag, envelope, properties, body);*/
//確認消息
//參數2:false爲確認收到消息,ture爲拒絕收到消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//註冊消費者
// 參數2:手動確認,咱們收到消息後,須要手動確認,告訴服務器,咱們收到消息了
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
package com.edu.work;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription 消息接受者
* @Author DZ
* @Date 2019/5/4 12:58
**/
public class Recver2 {
private final static String QUEUE = "testhellowork";//消息隊列的名稱
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//channel.basicQos(1);//告訴服務器,當前消息沒有確認以前,不要發送新消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//收到消息時候調用
System.out.println("消費者2收到的消息:" + new String(body));
/*super.handleDelivery(consumerTag, envelope, properties, body);*/
//確認消息
//參數2:false爲確認收到消息,ture爲拒絕收到消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//註冊消費者
// 參數2:手動確認,咱們收到消息後,須要手動確認,告訴服務器,咱們收到消息了
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
這種模式是最簡單的 work 模式,消息發送者,循環發送了100次消息,打印結果以下:
能夠看出,消息消費者消費到的消息是替換的,即 一個消息只被消費了一次,且兩個消費者各消費了50條消息。這裏有個弊端,消息消費者發佈消息的時候, 不管消費者的消費能力如何(電腦的內存等硬件),消息只會均勻分佈給各個消費者(能夠給2個消費者 sleep 下,結果仍是這樣)。有沒有什麼方式可讓消息自動分配(按照電腦的硬件,能者多勞),答案是能夠的,只須要增長 channel.basicQos(1); 此方案能夠用來進行負載均衡,搶紅包等場景定義消息發佈者
package com.edu.publish;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName Sender
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:43
**/
public class Sender {
private final static String EXCHANGE_NAME = "testexchange";//定義交換機名字
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機
//定義一個交換機,類型爲fanout,也就是發佈訂閱者模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//發佈訂閱模式,由於消息是先發布到交換機中,而交換機是沒有保存功能的,因此若是沒有消費者,消息會丟失
channel.basicPublish(EXCHANGE_NAME, "", null, "發佈訂閱模式的消息".getBytes());
channel.close();
connection.close();
}
}
複製代碼
定義2個消息消費者
package com.edu.publish;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:49
**/
public class Recver1 {
//定義交換機
private final static String EXCHANGE_NAME = "testexchange";
private final static String QUEUE = "testpubqueue1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//綁定隊列到交換機
channel.queueBind(QUEUE, EXCHANGE_NAME, "");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* super.handleDelivery(consumerTag, envelope, properties, body);*/
System.out.println("消費者1:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
package com.edu.publish;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:49
**/
public class Recver2 {
//定義交換機
private final static String EXCHANGE_NAME = "testexchange";
private final static String QUEUE = "testpubqueue2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//綁定隊列到交換機
channel.queueBind(QUEUE, EXCHANGE_NAME, "");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* super.handleDelivery(consumerTag, envelope, properties, body);*/
System.out.println("消費者2:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
消費者1 和消費者2 都監聽了被同一個交換器綁定的隊列,所以消息被同時消費到了。若是消息發送到沒有隊列綁定的交換器時,消息將丟失,由於交換器沒有存儲消息的能力,消息只能存儲在隊列中。
應用場景:好比一個商城系統須要在管理員上傳商品新的圖片時,前臺系統必須更新圖片,日誌系統必須記錄相應的日誌,那麼就能夠將兩個隊列綁定到圖片上傳交換器上,一個用於前臺系統更新圖片,另外一個用於日誌系統記錄日誌。
定義消息發佈者
package com.edu.route;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName Sender
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 15:05
**/
public class Sender {
private final static String EXCANGE_NAME = "testroute";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//定義路由格式的交換機
channel.exchangeDeclare(EXCANGE_NAME, "direct");
channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes());
channel.close();
connection.close();
}
}
複製代碼
package com.edu.route;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:49
**/
public class Recver1 {
//定義交換機
private final static String EXCHANGE_NAME = "testroute";
private final static String QUEUE = "testroute1queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//綁定隊列到交換機
//參數3:綁定到交換機指定的路由的名字
channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
//若是須要綁定多個路由,再綁定一次便可
channel.queueBind(QUEUE, EXCHANGE_NAME, "key2");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* super.handleDelivery(consumerTag, envelope, properties, body);*/
System.out.println("消費者1:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
package com.edu.route;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:49
**/
public class Recver2 {
//定義交換機
private final static String EXCHANGE_NAME = "testroute";
private final static String QUEUE = "testroute2queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//綁定隊列到交換機
//參數3:綁定到交換機指定的路由的名字
channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
//若是須要綁定多個路由,再綁定一次便可
channel.queueBind(QUEUE, EXCHANGE_NAME, "key3");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* super.handleDelivery(consumerTag, envelope, properties, body);*/
System.out.println("消費者2:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
應用場景:利用消費者可以有選擇性的接收消息的特性,好比咱們商城系統的後臺管理系統對於商品進行修改、刪除、新增操做都須要更新前臺系統的界面展現,而查詢操做確不須要,那麼這兩個隊列分開接收消息就比較好。
package com.edu.topic;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName Sender
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 15:19
**/
public class Sender {
private final static String EXCANGE_NAME = "testtopexchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCANGE_NAME, "topic");
channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息發送者:".getBytes());
channel.close();
connection.close();
}
}
複製代碼
package com.edu.topic;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:49
**/
public class Recver1 {
//定義交換機
private final static String EXCHANGE_NAME = "testtopexchange";
private final static String QUEUE = "testtopic1queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//綁定隊列到交換機
//參數3:綁定到交換機指定的路由的名字
channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
//若是須要綁定多個路由,再綁定一次便可
channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* super.handleDelivery(consumerTag, envelope, properties, body);*/
System.out.println("消費者1:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
package com.edu.topic;
import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Recver1
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 14:49
**/
public class Recver2 {
//定義交換機
private final static String EXCHANGE_NAME = "testtopexchange";
private final static String QUEUE = "testtopic2queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//綁定隊列到交換機
//參數3:綁定到交換機指定的路由的名字
channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
//若是須要綁定多個路由,再綁定一次便可
channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#");
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* super.handleDelivery(consumerTag, envelope, properties, body);*/
System.out.println("消費者2:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE, false, defaultConsumer);
}
}
複製代碼
第六種模式是將上述的模式集成其它的框架,進行遠程訪問,這裏咱們將集成 Spring 實現 RCP 遠程模式的使用
編寫spring的配置,此配置文件的目的是將 Spring 與 RabbitMQ 進行整合,實際上就是將 MQ 的相關信息(鏈接,隊列,交換機……)經過XML配置的方式實現
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<!--定義鏈接工廠-->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
virtual-host="/test"/>
<!--
定義模板
第三個參數,決定消息發送到哪裏,若是爲exchange,則發送到交換機;若是爲queue,則發送到隊列
-->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義隊列-->
<rabbit:queue name="myQueue" auto-declare="true"/>
<!--定義交換機-->
<rabbit:fanout-exchange name="fanoutExange" auto-declare="true">
<!--將消息綁定到交換機-->
<rabbit:bindings>
<rabbit:binding queue="myQueue">
</rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--定義監聽器,收到消息會執行-->
<rabbit:listener-container connection-factory="connectionFactory">
<!-- 定義監聽的類和方法-->
<rabbit:listener ref="consumer" method="test" queue-names="myQueue"/>
</rabbit:listener-container>
<!--定義消費者-->
<bean id="consumer" class="com.edu.spring.MyConsumer"/>
</beans>
複製代碼
生產者:
package com.edu.spring;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ClassName SpringTest
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 18:40
**/
public class SpringTest {
public static void main(String[] args) throws Exception {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.convertAndSend("Spring的消息");
((ClassPathXmlApplicationContext) applicationContext).destroy();
}
}
複製代碼
消費者
package com.edu.spring;
/**
* @ClassName MyConsumer
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 18:35
**/
public class MyConsumer {
/*用於接收消息*/
public void test(String message) {
System.err.println(message);
}
}
複製代碼
集成Spring主要是在xml中實現了隊列和交換機的建立。
最好能理解上面的圖。理解後,之後寫相關的代碼,直接去網上 copy 一份配置文件,而後根據本身項目的狀況進行修改。若是不能理解,就不知道如何修改出現錯誤後不知道錯誤出如今什麼地方。手動模式,主要增長MQ的回調操做,MQ消息失敗或者成功就有相應的回調信息,加強系統的健壯性,一旦產生異常,很快就能定位到異常的位置,因此在實際開發中,通常都這種方式
建立xml配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
<context:component-scan base-package="com.edu"/>
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!--
定義鏈接工廠
publisher-confirms爲ture,確認失敗等回調纔會執行
-->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
virtual-host="/test" publisher-confirms="true"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"/>
<!--定義隊列-->
<rabbit:queue name="myQueue" auto-declare="true"/>
<!--定義交換機-->
<rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX">
<!--將消息綁定到交換機-->
<rabbit:bindings>
<rabbit:binding queue="myQueue">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義監聽器,收到消息會執行-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<!-- 定義監聽的類和方法-->
<rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/>
</rabbit:listener-container>
</beans>
複製代碼
建立回調監聽函數
package com.edu.spring2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
/**
* @ClassName ConfirmCallBackListener
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 22:26
**/
@Component("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("確認回調 ack==" + ack + "回調緣由==" + cause);
}
}
複製代碼
package com.edu.spring2;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @ClassName ReceiveConfirmTestListener
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 22:24
**/
@Component("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
/**
* 收到消息時,執行的監聽
*
* @param message
* @param channel
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(("消費者收到了消息" + message));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
複製代碼
package com.edu.spring2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @ClassName ReturnCallBackListener
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 22:28
**/
@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("失敗回調" + message);
}
}
複製代碼
回調函數的配置來自 XML
建立發送消息的工具類package com.edu.spring2;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName PublicUtil
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 22:30
**/
@Component("publicUtil")
public class PublicUtil {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String excange, String routingkey, Object message) {
amqpTemplate.convertAndSend(excange, routingkey, message);
}
}
複製代碼
建立測試類
package com.edu.spring2;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @ClassName TestMain
* @Deccription TODO
* @Author DZ
* @Date 2019/5/4 22:32
**/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext2.xml"})
public class TestMain {
@Autowired
private PublicUtil publicUtil;
private static String exChange = "DIRECT_EX";//交換機
private static String queue = "myQueue";
/**
* exChange和queue均正確
* confirm會執行,ack = ture
* 消息正常接收(接收消息確認方法正常執行)
* @throws Exception
*/
@Test
public void test1() throws Exception {
publicUtil.send(exChange, queue, "測試1,隊列和交換機均正確");
}
/**
* exChange錯誤,queue正確
* confirm執行,ack=false
* 消息沒法接收(接收消息確認方法不能執行)
* @throws Exception
*/
@Test
public void test2() throws Exception {
publicUtil.send(exChange + "1", queue, "測試2,隊列正確,交換機錯誤");
}
/**
* exChange正常,queue錯誤
* return執行
* confirm執行,ack=ture
* @throws Exception
*/
@Test
public void test3() throws Exception {
publicUtil.send(exChange, queue + "1", "測試2,隊列錯誤,交換機正確");
}
/**
* exChange錯誤,queue錯誤
* confirm執行,ack=false
* @throws Exception
*/
@Test
public void test4() throws Exception {
publicUtil.send(exChange + "1", queue + "1", "測試2,隊列錯誤,交換機錯誤");
}
}
複製代碼
測試結果以下:
test1:exChange和queue均正確
confirm會執行,ack=ture;能正常收到消息(接收消息的方法正常執行)test2:exChange錯誤,queue正確
上述結論及代碼以下圖:
根據上述的測試結果,咱們能夠根據回調函數的返回結果,查看MQ的錯誤出如今那裏。根據上述結論,咱們能夠對3個回調函數作以下處理:
類 ReceiveConfirmTestListener 中的onMessage方法主要用於接收從 RabbitMQ 推送過來的消息,並對消息作相應的邏輯處理
類 ConfirmCallBackListener 中的 confirm 方法主要用於檢查交換機(exChange),當 ack=false,交換機可能錯誤
類 ReturnCallBackListener 中的 returnedMessage 方法用於檢查隊列(queue),當此方法執行時,隊列可能錯誤
實際上,在真實項目中,上面3個方法也是按照這3個邏輯進行設計的。固然這3個方法中還能夠加入更多的日誌消息,和邏輯處理業務。