1.什麼是MQhtml
消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不一樣進程Process/線程Thread之間通訊。
爲何會產生消息隊列?有幾個緣由:java
不一樣進程(process)之間傳遞消息時,兩個進程之間耦合程度太高,改動一個進程,引起必須修改另外一個進程,爲了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),全部兩進程之間傳遞的消息,都必須經過消息隊列來傳遞,單獨修改某一個進程,不會影響另外一個;linux
不一樣進程(process)之間傳遞消息時,爲了實現標準化,將消息的格式規範化了,而且,某一個進程接受的消息太多,一會兒沒法處理完,而且也有前後順序,必須對收到的消息進行排隊,所以誕生了事實上的消息隊列;web
關於消息隊列的詳細介紹請參閱:
《Java帝國之消息隊列》
《一個故事告訴你什麼是消息隊列》
《到底何時該使用MQ》spring
MQ框架很是之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。本文主要介紹RabbitMq。apache
本教程pdf及代碼下載地址:
代碼:https://download.csdn.net/download/zpcandzhj/10585077
教程:https://download.csdn.net/download/zpcandzhj/10585092編程
2.RabbitMQjson
2.1.RabbitMQ的簡介windows
開發語言:Erlang – 面向併發的編程語言。瀏覽器
2.1.1.AMQP
AMQP是消息隊列的一個協議。
2.2.官網
2.3.MQ的其餘產品
2.4.學習5種隊列
2.5.安裝文檔
3.搭建RabbitMQ環境
3.1.下載
下載地址:http://www.rabbitmq.com/download.html
3.2.windows下安裝
3.2.1.安裝Erlang
下載:http://www.erlang.org/download/otp_win64_17.3.exe
安裝:
安裝完成。
3.2.2.安裝RabbitMQ
安裝完成。
開始菜單裏出現以下選項:
啓動、中止、從新安裝等。
3.2.3.啓用管理工具
一、雙擊
二、進入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin輸入命令:
rabbitmq-plugins enable rabbitmq_management
這樣就啓動了管理工具,能夠試一下命令:
中止:net stop RabbitMQ
啓動:net start RabbitMQ
三、在瀏覽器中輸入地址查看:http://127.0.0.1:15672/
四、使用默認帳號登陸:guest/ guest
3.3.Linux下安裝
3.3.1.安裝Erlang
3.3.2.添加yum支持
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
使用yum安裝:
sudo yum install erlang
3.3.3.安裝RabbitMQ
上傳rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安裝:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
3.3.4.啓動、中止
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
3.3.5.設置開機啓動
chkconfig rabbitmq-server on
3.3.6.設置配置文件
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
3.3.7.開啓用戶遠程訪問
vi /etc/rabbitmq/rabbitmq.config
注意要去掉後面的逗號。
3.3.8.開啓web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
3.3.9.防火牆開放15672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
3.4.安裝的注意事項
一、推薦使用默認的安裝路徑
二、系統用戶名必須是英文
Win10更名字很是麻煩,具體方法百度
三、計算機名必須是英文
四、系統的用戶必須是管理員
若是安裝失敗應該如何解決:
一、重裝系統 – 不推薦
二、將RabbitMQ安裝到linux虛擬機中
a)推薦
三、使用別人安裝好的RabbitMQ服務
a)只要給你開通一個帳戶便可。
b)使用公用的RabbitMQ服務,在192.168.50.22
c)推薦
常見錯誤:
3.5.安裝完成後操做
一、系統服務中有RabbitMQ服務,中止、啓動、重啓
二、打開命令行工具
若是找不到命令行工具,直接cd到相應目錄:
輸入命令rabbitmq-plugins enable rabbitmq_management啓用管理插件
查看管理頁面
經過默認帳戶 guest/guest 登陸
若是可以登陸,說明安裝成功。
4.添加用戶
4.1.添加admin用戶
4.2.用戶角色
一、超級管理員(administrator)
可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
二、監控者(monitoring)
可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)
三、策略制定者(policymaker)
可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
四、普通管理者(management)
僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
五、其餘
沒法登錄管理控制檯,一般就是普通的生產者和消費者。
4.3.建立Virtual Hosts
選中Admin用戶,設置權限:
看到權限已加:
4.4.管理界面中的功能
5.學習五種隊列
5.1.導入my-rabbitmq項目
項目下載地址:
https://download.csdn.net/download/zpcandzhj/10585077
5.2.簡單隊列
5.2.1.圖示
P:消息的生產者
C:消息的消費者
紅色:隊列
生產者將消息發送到隊列,消費者從隊列中獲取消息。
5.2.2.導入RabbitMQ的客戶端依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
5.2.3.獲取MQ的鏈接
package com.zpc.rabbitmq.util; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { public static Connection getConnection() throws Exception { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //設置帳號信息,用戶名、密碼、vhost factory.setVirtualHost("testhost"); factory.setUsername("admin"); factory.setPassword("admin"); // 經過工程獲取鏈接 Connection connection = factory.newConnection(); return connection; } }
5.2.4.生產者發送消息到隊列
package com.zpc.rabbitmq.simple; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從鏈接中建立通道 Channel channel = connection.createChannel(); // 聲明(建立)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉通道和鏈接 channel.close(); connection.close(); } }
5.2.5.管理工具中查看消息
點擊上面的隊列名稱,查詢具體的隊列中的信息:
5.2.6.消費者從隊列中獲取消息
package com.zpc.rabbitmq.simple; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從鏈接中建立通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
5.3.Work模式
5.3.1.圖示
一個生產者、2個消費者。
一個消息只能被一個消費者獲取。
5.3.2.消費者1
package com.zpc.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 //channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,false表示手動返回完成狀態,true表示自動 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [y] Received '" + message + "'"); //休眠 Thread.sleep(10); // 返回確認狀態,註釋掉表示使用自動確認模式 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
5.3.3.消費者2
package com.zpc.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 //channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,false表示手動返回完成狀態,true表示自動 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // 休眠1秒 Thread.sleep(1000); //下面這行註釋掉表示使用自動確認模式 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
5.3.4.生產者
向隊列中發送100條消息。
package com.zpc.rabbitmq.work; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { // 消息內容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); } }
5.3.5.測試
測試結果:
一、消費者1和消費者2獲取到的消息內容是不一樣的,同一個消息只能被一個消費者獲取。
二、消費者1和消費者2獲取到的消息的數量是相同的,一個是消費奇數號消息,一個是偶數。
其實,這樣是不合理的,由於消費者1線程停頓的時間短。應該是消費者1要比消費者2獲取到的消息多才對。
RabbitMQ 默認將消息順序發送給下一個消費者,這樣,每一個消費者會獲得相同數量的消息。即輪詢(round-robin)分發消息。
怎樣才能作到按照每一個消費者的能力分配消息呢?聯合使用 Qos 和 Acknowledge 就能夠作到。
basicQos 方法設置了當前信道最大預獲取(prefetch)消息數量爲1。消息從隊列異步推送給消費者,消費者的 ack 也是異步發送給隊列,從隊列的視角去看,老是會有一批消息已推送但還沒有得到 ack 確認,Qos 的 prefetchCount 參數就是用來限制這批未確認消息數量的。設爲1時,隊列只有在收到消費者發回的上一條消息 ack 確認後,纔會向該消費者發送下一條消息。prefetchCount 的默認值爲0,即沒有限制,隊列會將全部消息儘快發給消費者。
2個概念
輪詢分發 :使用任務隊列的優勢之一就是能夠輕易的並行工做。若是咱們積壓了好多工做,咱們能夠經過增長工做者(消費者)來解決這一問題,使得系統的伸縮性更加容易。在默認狀況下,RabbitMQ將逐個發送消息到在序列中的下一個消費者(而不考慮每一個任務的時長等等,且是提早一次性分配,並不是一個一個分配)。平均每一個消費者得到相同數量的消息。這種方式分發消息機制稱爲Round-Robin(輪詢)。
公平分發 :雖然上面的分配法方式也還行,可是有個問題就是:好比:如今有2個消費者,全部的奇數的消息都是繁忙的,而偶數則是輕鬆的。按照輪詢的方式,奇數的任務交給了第一個消費者,因此一直在忙個不停。偶數的任務交給另外一個消費者,則當即完成任務,而後閒得不行。而RabbitMQ則是不瞭解這些的。這是由於當消息進入隊列,RabbitMQ就會分派消息。它不看消費者爲應答的數目,只是盲目的將消息發給輪詢指定的消費者。
爲了解決這個問題,咱們使用basicQos( prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢後,有了反饋,纔會進行第二次發送。
還有一點須要注意,使用公平分發,必須關閉自動應答,改成手動應答。
5.4.Work模式的「能者多勞」
打開上述代碼的註釋:
// 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1);
//開啓這行 表示使用手動確認模式 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
同時改成手動確認:
// 監聽隊列,false表示手動返回完成狀態,true表示自動 channel.basicConsume(QUEUE_NAME, false, consumer);
測試:
消費者1比消費者2獲取的消息更多。
5.5.消息的確認模式
消費者從隊列中獲取消息,服務端如何知道消息已經被消費呢?
模式1:自動確認
只要消息從隊列中獲取,不管消費者獲取到消息後是否成功消息,都認爲是消息已經成功消費。
模式2:手動確認
消費者從隊列中獲取消息後,服務器會將該消息標記爲不可用狀態,等待消費者的反饋,若是消費者一直沒有反饋,那麼該消息將一直處於不可用狀態。
手動模式:
自動模式:
5.6.訂閱模式
5.6.1.圖示
解讀:
一、1個生產者,多個消費者
二、每個消費者都有本身的一個隊列
三、生產者沒有將消息直接發送到隊列,而是發送到了交換機
四、每一個隊列都要綁定到交換機
五、生產者發送的消息,通過交換機,到達隊列,實現,一個消息被多個消費者獲取的目的
注意:一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費
5.6.2.消息的生產者(看做是後臺系統)
向交換機中發送消息。
package com.zpc.rabbitmq.subscribe; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
注意:消息發送到沒有隊列綁定的交換機時,消息將丟失,由於,交換機沒有存儲消息的能力,消息只能存在在隊列中。
5.6.3.消費者1(看做是前臺系統)
package com.zpc.rabbitmq.subscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_work1"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
5.6.4.消費者2(看做是搜索系統)
package com.zpc.rabbitmq.subscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv2] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
5.6.5.測試
測試結果:
同一個消息被多個消費者獲取。一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費到消息。
在管理工具中查看隊列和交換機的綁定關係:
5.7.路由模式
5.7.1.圖示
5.7.2.生產者
5.7.3.消費者1(假設是前臺系統)
5.7.4.消費2(假設是搜索系統)
5.8.主題模式(通配符模式)
5.8.1.圖示
同一個消息被多個消費者獲取。一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費到消息。
5.8.2.生產者
package com.zpc.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zpc.rabbitmq.util.ConnectionUtil; public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "Hello World!!"; channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
5.8.3.消費者1(前臺系統)
package com.zpc.rabbitmq.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zpc.rabbitmq.util.ConnectionUtil; public class Recv { private final static String QUEUE_NAME = "test_queue_topic_work_1"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv_x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
5.8.4.消費者2(搜索系統)
package com.zpc.rabbitmq.topic; import com.zpc.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_work_2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv2_x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
6.Spring-Rabbit
6.1.Spring項目
http://spring.io/projects
6.2.簡介
6.3.使用
6.3.1.消費者
package com.zpc.rabbitmq.spring; /** * 消費者 * * @author Evan */ public class Foo { //具體執行業務的方法 public void listen(String foo) { System.out.println("\n消費者: " + foo + "\n"); } }
6.3.2.生產者
package com.zpc.rabbitmq.spring; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringMain { public static void main(final String... args) throws Exception { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath:spring/rabbitmq-context.xml"); //RabbitMQ模板 RabbitTemplate template = ctx.getBean(RabbitTemplate.class); //發送消息 template.convertAndSend("Hello, 鳥鵬!"); Thread.sleep(1000);// 休眠1秒 ctx.destroy(); //容器銷燬 } }
6.3.3.配置文件
一、定義鏈接工廠
<!-- 定義RabbitMQ的鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="admin" password="admin" virtual-host="testhost" />
二、定義模板(能夠指定交換機或隊列)
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
三、定義隊列、交換機、以及完成隊列和交換機的綁定
<!-- 定義隊列,自動聲明 --> <rabbit:queue name="zpcQueue" auto-declare="true"/> <!-- 定義交換器,把Q綁定到交換機,自動聲明 --> <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="zpcQueue"/> </rabbit:bindings> </rabbit:fanout-exchange>
四、定義監聽
<rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" /> </rabbit:listener-container> <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
五、定義管理,用於管理隊列、交換機等:
<!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory" />
完整配置文件rabbitmq-context.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定義RabbitMQ的鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="admin" password="admin" virtual-host="testhost" /> <!-- 定義Rabbit模板,指定鏈接工廠以及定義exchange --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" /> <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" routing-key="foo.bar" /> --> <!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定義隊列,自動聲明 --> <rabbit:queue name="zpcQueue" auto-declare="true"/> <!-- 定義交換器,把Q綁定到交換機,自動聲明 --> <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="zpcQueue"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> </rabbit:bindings> </rabbit:topic-exchange> --> <!-- 隊列監聽 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" /> </rabbit:listener-container> <bean id="foo" class="com.zpc.rabbitmq.spring.Foo" /> </beans>
6.4.持久化交換機和隊列
持久化:將交換機或隊列的數據保存到磁盤,服務器宕機或重啓以後依然存在。
非持久化:將交換機或隊列的數據保存到內存,服務器宕機或重啓以後將不存在。
非持久化的性能高於持久化。
如何選擇持久化?非持久化? – 看需求。
7.Spring集成RabbitMQ一個完整案例
建立三個系統A,B,C
A做爲生產者,B、C做爲消費者(B,C做爲web項目啓動)
項目下載地址:https://download.csdn.net/download/zpcandzhj/10585077
7.1.在A系統中發送消息到交換機
7.1.1.導入依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zpc</groupId> <artifactId>myrabbitA</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>myrabbit</name> <dependencies> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> </project>
7.1.2.隊列和交換機的綁定關係
實現:
一、在配置文件中將隊列和交換機完成綁定
二、能夠在管理界面中完成綁定
a)綁定關係若是發生變化,須要修改配置文件,而且服務須要重啓
b)管理更加靈活
c)更容易對綁定關係的權限管理,流程管理
本例選擇第2種方式
7.1.3.配置
rabbitmq-context.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定義RabbitMQ的鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="admin" password="admin" virtual-host="testhost" /> <!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定義交換器,暫時不把Q綁定到交換機,在管理界面去綁定 --> <!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>--> <rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange> <!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>--> <!-- 定義Rabbit模板,指定鏈接工廠以及定義exchange(exchange要和上面的一致) --> <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />--> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" /> <!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />--> </beans>
7.1.4.消息內容
方案:
一、消息內容使用對象作json序列化發送
a)數據大
b)有些數據其餘人是可能用不到的
二、發送特定的業務字段,如id、操做類型
7.1.5.實現
生產者MsgSender.java:
package com.zpc.myrabbit; import com.alibaba.fastjson.JSON; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * 消息生產者 */ public class MsgSender { public static void main(String[] args) throws Exception { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath:spring/rabbitmq-context.xml"); //RabbitMQ模板 RabbitTemplate template = ctx.getBean(RabbitTemplate.class); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小時制 //發送消息 Map<String, Object> msg = new HashMap<String, Object>(); msg.put("type", "1"); msg.put("date", date); template.convertAndSend("type2", JSON.toJSONString(msg)); Thread.sleep(1000);// 休眠1秒 ctx.destroy(); //容器銷燬 } }
7.2.在B系統接收消息
7.2.1.導入依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zpc</groupId> <artifactId>myrabbitB</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <name>myrabbit</name> <properties> <spring.version>4.1.3.RELEASE</spring.version> <fastjson.version>1.2.46</fastjson.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <!-- web層須要配置Tomcat插件 --> <plugin> <groupId>org.apache.tomcat.maven</groupId> <artifactId>tomcat7-maven-plugin</artifactId> <configuration> <path>/testRabbit</path> <uriEncoding>UTF-8</uriEncoding> <port>8081</port> </configuration> </plugin> </plugins> </build> </project>
7.2.2.配置
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定義RabbitMQ的鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="admin" password="admin" virtual-host="testhost" /> <!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定義B系統須要監聽的隊列,自動聲明 --> <rabbit:queue name="q_topic_testB" auto-declare="true"/> <!-- 隊列監聽 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" /> </rabbit:listener-container> <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" /> </beans>
7.2.3.具體處理邏輯
public class Listener { //具體執行業務的方法 public void listen(String msg) { System.out.println("\n消費者B開始處理消息: " + msg + "\n"); } }
7.2.4.在界面管理工具中完成綁定關係
選中定義好的交換機(exchange)
1)direct
2)fanout
3)topic
7.3.在C系統中接收消息
(和B系統配置差很少,無非是Q名和Q對應的處理邏輯變了)
7.3.1.配置
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定義RabbitMQ的鏈接工廠 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="admin" password="admin" virtual-host="testhost" /> <!-- MQ的管理,包括隊列、交換器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定義C系統須要監聽的隊列,自動聲明 --> <rabbit:queue name="q_topic_testC" auto-declare="true"/> <!-- 隊列監聽 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" /> </rabbit:listener-container> <bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" /> </beans>
7.3.2.處理業務邏輯
public class Listener { //具體執行業務的方法 public void listen(String msg) { System.out.println("\n消費者C開始處理消息: " + msg + "\n"); } }
7.3.3.在管理工具中綁定隊列和交換機
見7.2.4
7.3.4.測試
分別啓動B,C兩個web應用,而後運行A的MsgSender主方法發送消息,分別測試fanout、direct、topic三種類型
8.Springboot集成RabbitMQ
springboot集成RabbitMQ很是簡單,若是隻是簡單的使用配置很是少,springboot提供了spring-boot-starter-amqp對消息各類支持。
代碼下載地址:https://download.csdn.net/download/zpcandzhj/10585077
8.1.簡單隊列
一、配置pom文件,主要是添加spring-boot-starter-amqp的支持
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、配置application.properties文件
配置rabbitmq的安裝地址、端口以及帳戶信息
spring.application.name=spirng-boot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
三、配置隊列
package com.zpc.rabbitmq; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue queue() { return new Queue("q_hello"); } }
四、發送者
package com.zpc.rabbitmq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小時制 String context = "hello " + date; System.out.println("Sender : " + context); //簡單對列的狀況下routingKey即爲Q名 this.rabbitTemplate.convertAndSend("q_hello", context); } }
五、接收者
package com.zpc.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
六、測試
package com.zpc.rabbitmq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }
8.2.多對多使用(Work模式)
註冊兩個Receiver:
package com.zpc.rabbitmq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_hello") public class HelloReceiver2 { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } }
@Test public void oneToMany() throws Exception { for (int i=0;i<100;i++){ helloSender.send(i); Thread.sleep(300); } }
public void send(int i) { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小時制 String context = "hello " + i + " " + date; System.out.println("Sender : " + context); //簡單對列的狀況下routingKey即爲Q名 this.rabbitTemplate.convertAndSend("q_hello", context); }
8.3.Topic Exchange(主題模式)
topic 是RabbitMQ中最靈活的一種方式,能夠根據routing_key自由的綁定不一樣的隊列
首先對topic規則配置,這裏使用兩個隊列(消費者)來演示。
1)配置隊列,綁定交換機
package com.zpc.rabbitmq.topic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { final static String message = "q_topic_message"; final static String messages = "q_topic_messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } /** * 聲明一個Topic類型的交換機 * @return */ @Bean TopicExchange exchange() { return new TopicExchange("mybootexchange"); } /** * 綁定Q到交換機,而且指定routingKey * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
2)建立2個消費者
q_topic_message 和q_topic_messages
package com.zpc.rabbitmq.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_topic_message") public class Receiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1 : " + hello); } }
package com.zpc.rabbitmq.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_topic_messages") public class Receiver2 { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } }
3)消息發送者(生產者)
package com.zpc.rabbitmq.topic; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MsgSender { @Autowired private AmqpTemplate rabbitTemplate; public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context); } }
send1方法會匹配到topic.#和topic.message,兩個Receiver均可以收到消息,發送send2只有topic.#能夠匹配全部只有Receiver2監聽到消息。
4)測試
package com.zpc.rabbitmq.topic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitTopicTest { @Autowired private MsgSender msgSender; @Test public void send1() throws Exception { msgSender.send1(); } @Test public void send2() throws Exception { msgSender.send2(); } }
8.4.Fanout Exchange(訂閱模式)
Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。
1)配置隊列,綁定交換機
package com.zpc.rabbitmq.fanout; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { @Bean public Queue aMessage() { return new Queue("q_fanout_A"); } @Bean public Queue bMessage() { return new Queue("q_fanout_B"); } @Bean public Queue cMessage() { return new Queue("q_fanout_C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("mybootfanoutExchange"); } @Bean Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(aMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(bMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(cMessage).to(fanoutExchange); } }
2)建立3個消費者
package com.zpc.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_fanout_A") public class ReceiverA { @RabbitHandler public void process(String hello) { System.out.println("AReceiver : " + hello + "/n"); } }
package com.zpc.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_fanout_B") public class ReceiverB { @RabbitHandler public void process(String hello) { System.out.println("BReceiver : " + hello + "/n"); } }
package com.zpc.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "q_fanout_C") public class ReceiverC { @RabbitHandler public void process(String hello) { System.out.println("CReceiver : " + hello + "/n"); } }
3)生產者
package com.zpc.rabbitmq.fanout; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MsgSenderFanout { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context); } }
4)測試
package com.zpc.rabbitmq.fanout; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitFanoutTest { @Autowired private MsgSenderFanout msgSender; @Test public void send1() throws Exception { msgSender.send(); } }
結果以下,三個消費者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg
9.總結
使用MQ實現商品數據的同步優點:
一、下降系統間耦合度
二、便於管理數據的同步(數據一致性)
推薦閱讀《RabbitMQ詳解》《大型網站技術架構:核心原理與案例分析》