全名爲:Message Queuejava
消息隊列是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。就是一個先進先出的隊列,只是隊列中存放的是message而已,由於消息的生產和消費都是異步的,並且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。git
ActiveMQ:基於JMSspring
RabbitMQ:基於AMQP協議,erlang語言開發,穩定性好apache
RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會vim
Kafka:分佈式消息系統,高吞吐量centos
如上所說的JMS和AMQP:數組
MQ是消息通訊的模型,併發具體實現。如今實現MQ的有兩種主流方式:AMQP、JMS,具體百度瀏覽器
JMS是定義了統一的接口,來對消息操做進行統一;AMQP是經過規定協議來統一數據交互的格式緩存
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,所以是跨語言的。併發
JMS規定了兩種消息模型;而AMQP的消息模型更加豐富
yum install esl-erlang_17.3-1~centos~6_amd64.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm
首先安裝包下載並上傳:連接:https://pan.baidu.com/s/1XM24RprcaXMAFHPdctkEIw 提取碼:1490
我是上傳到 /usr/local/rabbitmq/ ,大家隨意;
進入到安裝包上傳目錄:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
#將默認的配置文件模版 複製到 etc目錄下
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
#編輯配置問價
vim /etc/rabbitmq/rabbitmq.config
注意:打開註解,刪掉末尾的逗號,保存退出便可;
chkconfig rabbitmq-server on #設置爲開機啓動
service rabbitmq-server start #啓動服務
service rabbitmq-server stop #關閉服務
service rabbitmq-server restart #服務重啓
rabbitmq-plugins enable rabbitmq_management #經過命令開啓
service rabbitmq-server restart # 服務重啓,配置生效
端口是15672,自行開放,我是直接關閉了防火牆的;
下面咱們咱們既能夠王文Web管理頁面:帳號密碼默認爲:guest
瀏覽器沒有彈出翻譯頁面,咱們自翻譯
connections:不管生產者仍是消費者,都須要與RabbitMQ創建鏈接後才能夠完成消息的生產和消費,在這裏能夠查看鏈接狀況
channels:通道,創建鏈接後,會造成通道,消息的投遞獲取依賴通道。
Exchanges:交換機,用來實現消息的路由
Queues:隊列,即消息隊列,消息存放在隊列中,等待消費,消費後被移除隊列。
用戶的角色指定,對應不一樣權限:
可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)
可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
沒法登錄管理控制檯,一般就是普通的生產者和消費者。
RabbitMQ爲了實現每一個用戶互不干擾,經過虛擬主機的方式,不一樣用戶使用不一樣的路徑,各自有各自的隊列、交換機
虛擬機就建立好了,而後咱們能夠給用戶分配權限:
RabbitMQ提供了6種消息模型,可是第6種實際上是RPC,並非MQ,咱們就說說前面五種消息模型
基本的消息模型:
P:消息生產者
C:消息消費者
queue:消息隊列,消費者投遞消息,消費者取出消息並消費
<!--RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.4.RELEASE</version> </dependency>
java的鏈接MQ工具類:
public class ConnectionUtil { //創建與RabbitMQ的鏈接 public static Connection getConnection() throws Exception { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("192.168.159.159"); //端口 factory.setPort(5672); //設置帳號信息,用戶名、密碼、vhost factory.setVirtualHost("/new1"); factory.setUsername("/admin"); factory.setPassword("admin"); // 經過工程獲取鏈接 Connection connection = factory.newConnection(); return connection; } }
生產者發送消息
package com.mq.start; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /**生產者**/ public class send { //肯定隊列的標識 private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 從鏈接中建立通道,使用通道才能完成消息相關的操做 Channel channel = connection.createChannel(); // 聲明(建立)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; // 向指定的隊列中發送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [服務提供者] Send '" + message + "'"); //關閉通道和鏈接 channel.close(); connection.close(); } }
這個時候咱們切換到剛剛建立的用戶上 /admin 上查看信息:
消費者獲取消息:
package com.mq.start; import com.rabbitmq.client.*; import java.io.IOException; public class get { //隊列name 要達成通訊 必須和發送的隊列name 一致 private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [服務消費者] get : " + msg + "!"); } }; // 監聽隊列,第二個參數:是否自動進行消息確認。 channel.basicConsume(QUEUE_NAME, true, consumer); } }
控制檯打印:由於我發送了兩次
再次查看Web管理頁面,沒有消息了:
通過剛剛的小Demo,我能發現一旦消息從隊列中被消費者拉取消費後,隊列中的消息就會刪除,
這裏就涉及到一個MQ是經過消息確認機制知道消息什麼時候被消費,當消費者獲取到信息後,回想MQ返回一個ACK回執告知已被接受,能夠刪除。不過ACK回執分問兩種狀況:
手動ACK:消息接收後,通常在消費者消費掉該消息後手動發送ACK
自動ACK:消息接受後當即就會自動發送ACK
至於如何選擇:根據信息的重要程度區分
消息不過重要,即便丟失影響也不大,自動ACK比較巴適
消息很重要,不容許丟失,那就等咱們消費者消費完這個信息後手動發送回執
java實現:部分實現
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String msg = new String(body); System.out.println(" [服務消費者] get1 : " + msg + "!"); //在消息消費完後,手動發送ACK回執給MQ channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數:是否自動進行消息確認。 channel.basicConsume(QUEUE_NAME, false, consumer); }
當消息處理比較耗時的時候,可呢個生產消息的速度回遠遠大於消息的消費速度,隨着時間的推移,隊列中的消息就會堆積如山沒法及時的處理,此時work模型橫空出世,讓多個消費者綁定到一個隊列上,共同消費同一個隊列中的消息
一個生產者,一個隊列,兩個或者更多的消費者
消息的生產者:連續發送50個消息去隊列
package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class send {
private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循環發佈任務 for (int i = 0; i < 50; i++) { // 消息內容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 關閉通道和鏈接 channel.close(); connection.close(); } }
兩個消費者:一次只能處理接收一個消息處理:
package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * get1消費者有Thread.sleep(1000),模擬更耗時 */ public class get1 { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 final Channel channe1 = connection.createChannel(); // 聲明隊列 channe1.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channe1) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] get : " + msg + "!"); // 手動ACK channe1.basicAck(envelope.getDeliveryTag(), false); try { //模擬這個消費者消費一個消息很耗時 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 監聽隊列。 channe1.basicConsume(QUEUE_NAME, false, consumer); }
package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * get2處理比價快 */ public class get2 { private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 final Channel channe2 = connection.createChannel(); // 聲明隊列 channe2.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channe2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者2] get : " + msg + "!"); // 手動ACK channe2.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列。 channe2.basicConsume(QUEUE_NAME, false, consumer); } }
舒適提示:優先啓動兩個消費者,隨後再啓動消息發佈者
而後咱們看下面的控制檯:get1慢吞吞的在消費,get2快速的消費完便在休息了,一人消費一半
在上面這種狀況下,消費者get1的消費效率是要比消費者get2的效率要低的
但是兩個消費者最終的消費消費的信息數量確實同樣的,是任務均分的;
消費者get1一直在忙碌於消費,消費者get2處理完分配的一半後便處於空閒狀態
消費者同一時間只會接受一條消息,在處理完以前不會接新的消息,讓處理快的人接受更多的消息:
兩個消費者都修改設置以下:
// 設置每一個消費者同時只能處理一條消息
channel.basicQos(1);
讓咱們看看效果如何:
示意圖:
P:生產者,發送消息給X(交換機)
Exchange:交換機,圖中的X。接收生產者發送的消息。知道如何處理消息,例如遞交給某個特別隊列、遞交給全部隊列、或是將消息丟棄。到底如何操做,取決於Exchange的類型。Exchange有如下3種類型:
Fanout:廣播,將消息交給全部綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Queue:消息隊列,接收消息、緩存消息。
C:消費者,消息的消費者,會一直等待消息到來。
注意:交換機只負責轉發消息,不具有消息儲存的能力,若是沒有隊列與其進行對接,消息會丟失;
流程圖:
在廣播模式下,消息發送流程是這樣的:
1) 能夠有多個消費者
2) 每一個消費者有本身對接的queue(隊列)
3) 每一個隊列都要對接到Exchange(交換機)
4) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪一個隊列,生產者沒法決定。
5) 交換機把消息發送給綁定過的全部隊列
6) 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費
生產者:
package com.mq.start.訂閱模型_廣播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "四川新聞廣播電視臺爲你播報:今天..."; // 發佈消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生產者] Send '" + message + "'"); channel.close(); connection.close(); } }
消費者1:
package com.mq.start.訂閱模型_廣播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; public class get1 { private final static String QUEUE_NAME = "fanout_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
消費者2:
package com.mq.start.訂閱模型_廣播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; public class get2 { private final static String QUEUE_NAME = "fanout_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義隊列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, true, consumer); } }
而後查看控制檯:一條消息被全部訂閱的隊列都消費
廣播是一條消息被全部與交換機對接的隊列都消費,但有時候,咱們想不一樣的信息被不一樣的隊列說消費,這是就要用到Direct類型的交換機
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,而後把消息遞交給 與routing key徹底匹配的隊列
C1:消費者,其所在隊列指定了須要routing key 爲 error 的消息
C2:消費者,其所在隊列指定了須要routing key 爲 info、error、warning 的消息
在Direct模式下:
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey
(路由key)
消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey
。
Exchange再也不把消息交給每個綁定的隊列,而是根據消息的Routing Key
進行判斷,只有隊列的Routingkey
與消息的 Routing key
徹底一致,纔會接收到消息
消息生產者:
package com.mq.start.訂閱模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 咱們模擬商品的增刪改,發送消息的RoutingKey分別是:insert、update、delete */
public class send { private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容
String message = "商品新增, id = 1001"; // 發送消息,而且指定routing key 爲:insert ,表明新增商品
channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); System.out.println(" [商品服務:] Send '" + message + "'"); channel.close(); connection.close(); } }
記住咱們的roting key 是insert噢!
消息消費者1:get1 ,他能接受routing key 爲 "update"、"delete"的消息
package com.mq.start.訂閱模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; public class get1 {
private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定須要訂閱的routing key。假設此處須要update和delete消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
String msg = new String(body); System.out.println(" [消費者1] git : " + msg + "!"); } }; // 監聽隊列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer); } }
消息消費者2:get2,他能接受routing key 爲 "insert"、"update"、"delete" 的消息
package com.mq.start.訂閱模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; public class get2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定須要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定義隊列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
String msg = new String(body); System.out.println(" [消費者2] get : " + msg + "!"); } }; // 監聽隊列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer); } }
咱們分別設置routing key 爲 insert、update、delete,逐一測試:
其餘咱們就執行測試吧,我就測了routing key爲insert,最終被get2所消費
Topic類型yuDirect相比,其實差很少的,都是根據rounting key 把消息路由到不一樣的隊列,就是Topic類型的交換機支在匹配的時候支持rounting key的通配符
通配符規則:
#:匹配一個或多個詞
*:匹配很少很多剛好1個詞
舉例:
audit.#:可以匹配audit.irs.corporate或者 audit.irs
audit.*:只能匹配audit.irs
消息生產者:Rounting key 爲: item.insert / update / delete
package com.mq.start.訂閱模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class send {
private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容
String message = "新增商品 : id = 1001"; // 發送消息,而且指定routing key 爲:insert ,表明新增商品
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服務:] Send '" + message + "'"); channel.close(); connection.close(); } }
消息消費者1:get1,匹配的Rounting Key爲 item.update / delete
package com.mq.start.訂閱模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; public class get1 { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定須要訂閱的routing key。須要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 定義隊列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
String msg = new String(body); System.out.println(" [Get1] get : " + msg + "!"); } }; // 監聽隊列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer); } }
消息消費者2:get2 ,經過通配符的方式,消費全部item打頭,後拼一個單詞的全部消息
package com.mq.start.訂閱模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; public class get2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; public static void main(String[] argv) throws Exception { // 獲取到鏈接
Connection connection = connectionUtils.getConnection(); // 獲取通道
Channel channel = connection.createChannel(); // 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機,同時指定須要訂閱的routing key。訂閱 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); // 定義隊列的消費者
DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
String msg = new String(body); System.out.println(" [get2] get : " + msg + "!"); } }; // 監聽隊列,自動ACK
channel.basicConsume(QUEUE_NAME, true, consumer); } }
自測,效果一模一樣
消費者的手動ACK機制,可有效的避免消息的丟失
若想支持消息持久化,隊列和交換機都得持久化
交換機的持久化:
// 聲明exchange,指定類型爲topic,其後跟一個true參數目標是開啓交換機的持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
隊列的持久化:
// 聲明隊列,第二個參數表示是否開啓隊列持久化 channe1.queueDeclare(QUEUE_NAME, false, false, false, null);
消息持久化:
// 發送消息,而且指定routing key 爲:insert ,第三個參數表示開啓信息持久化 channel.basicPublish(EXCHANGE_NAME, "item.update",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Spring-amqp是對AMQP協議的抽象實現,而spring-rabbit 是對協議的具體實現,也是目前的惟一實現。底層使用的就是RabbitMQ。
依賴和配置:
pom.xml
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
application.yml
spring: rabbitmq: host: 192.168.159.159 username: /admin password: admin virtual-host: /new1 template: #有關Template的配置 retry: #失敗重試 enabled: true #失敗重試_開啓失敗重試 initial-interval: 10000ms #失敗重試_第一次重試的間隔時長 max-interval: 300000ms #失敗重試_最長重試間隔 multiplier: 2 #失敗重試_下次重試間隔的倍速 exchange: spring.test.exchange #指定交換機,發送消息若不指定交換機就使用配置的交換機 publisher-confirms: true #生產者確認機制,確保消息正確發送,發送失敗會有錯誤回執
服務的監聽者:在SpringAMQP中,普通方法 + 註解,就能夠成爲一個消費者。
package com.mq.start.SpringAMQP; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 這是一個消費者 / 監聽者 */ @Component @RabbitListener(queues = "spring.test.queue" ) public class Listener { /** * - @Componet`:類上的註解,註冊到Spring容器 * - `@RabbitListener`:方法上的註解,聲明這個方法是一個消費者方法,須要指定下面的屬性: * - `bindings`:指定綁定關係,能夠有多個。值是`@QueueBinding`的數組。`@QueueBinding`包含下面屬性: * - `value`:這個消費者關聯的隊列。值是`@Queue`,表明一個隊列 * - `exchange`:隊列所綁定的交換機,值是`@Exchange`類型 * - `key`:隊列和交換機綁定的`RoutingKey` */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"#.#"})) public void listen(String msg){ System.out.println("接收到消息:" + msg); } }
消息的發送者:AmqpTemplate
Spring爲AMQP提供了統一的消息處理模板:AmqpTemplate,很是方便的發送消息,其發送方法:
package com.mq.start.SpringAMQP; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class mqSend { @Autowired private AmqpTemplate amqpTemplate; @Test public void testSend() throws InterruptedException { String msg = "hello, Spring boot amqp"; this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg); // 等待10秒後再結束 Thread.sleep(10000); } }
外加一個SpringBoot項目的啓動類:
package com.mq.start; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Run { public static void main(String[] args) { SpringApplication.run(Run.class, args); } }