RabbitMQ詳解

消息隊列:RabbitMQ

全名爲:Message Queuejava

消息隊列是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。就是一個先進先出的隊列,只是隊列中存放的是message而已,由於消息的生產和消費都是異步的,並且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。git

常見的MQ產品

ActiveMQ:基於JMSspring

RabbitMQ:基於AMQP協議,erlang語言開發,穩定性好apache

RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會vim

Kafka:分佈式消息系統,高吞吐量centos

  • 如上所說的JMS和AMQP:數組

    • MQ是消息通訊的模型,併發具體實現。如今實現MQ的有兩種主流方式:AMQP、JMS,具體百度瀏覽器

    • JMS是定義了統一的接口,來對消息操做進行統一;AMQP是經過規定協議來統一數據交互的格式緩存

    • JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,所以是跨語言的。併發

    • JMS規定了兩種消息模型;而AMQP的消息模型更加豐富

RabbitMQ環境搭建及相關設置

安裝Erlang

yum install esl-erlang_17.3-1~centos~6_amd64.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm

安裝RabbitMQ

首先安裝包下載並上傳:連接:https://pan.baidu.com/s/1XM24RprcaXMAFHPdctkEIw 提取碼:1490

我是上傳到 /usr/local/rabbitmq/ ,大家隨意;

進入到安裝包上傳目錄:

rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

修改配置文件

#將默認的配置文件模版 複製到 etc目錄下
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
#編輯配置問價
vim /etc/rabbitmq/rabbitmq.config

  

注意:打開註解,刪掉末尾的逗號,保存退出便可;

chkconfig rabbitmq-server on   #設置爲開機啓動
service rabbitmq-server start   #啓動服務
service rabbitmq-server stop    #關閉服務
service rabbitmq-server restart #服務重啓

開啓Web管理頁面

rabbitmq-plugins enable rabbitmq_management   #經過命令開啓
service rabbitmq-server restart              # 服務重啓,配置生效

端口是15672,自行開放,我是直接關閉了防火牆的;

下面咱們咱們既能夠王文Web管理頁面:帳號密碼默認爲:guest

瀏覽器沒有彈出翻譯頁面,咱們自翻譯

  

  • connections:不管生產者仍是消費者,都須要與RabbitMQ創建鏈接後才能夠完成消息的生產和消費,在這裏能夠查看鏈接狀況

  • channels:通道,創建鏈接後,會造成通道,消息的投遞獲取依賴通道。

  • Exchanges:交換機,用來實現消息的路由

  • Queues:隊列,即消息隊列,消息存放在隊列中,等待消費,消費後被移除隊列。

  

用戶的添加

  

用戶的角色指定,對應不一樣權限:

  • 超級管理員(administrator)

    可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。

  • 監控者(monitoring)

    可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)

  • 策略制定者(policymaker)

    可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。

  • 普通管理者(management)

    僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。

  • 其餘

    沒法登錄管理控制檯,一般就是普通的生產者和消費者。

建立虛擬主機

RabbitMQ爲了實現每一個用戶互不干擾,經過虛擬主機的方式,不一樣用戶使用不一樣的路徑,各自有各自的隊列、交換機

  

虛擬機就建立好了,而後咱們能夠給用戶分配權限:

消息模型—基本模型

RabbitMQ提供了6種消息模型,可是第6種實際上是RPC,並非MQ,咱們就說說前面五種消息模型

基本的消息模型:

 

P:消息生產者

C:消息消費者

queue:消息隊列,消費者投遞消息,消費者取出消息並消費

  •  <!--RabbitMQ-->
     <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
           <version>2.1.4.RELEASE</version>
     </dependency>
  • java的鏈接MQ工具類:

  • public class ConnectionUtil { //創建與RabbitMQ的鏈接
     public static Connection getConnection() throws Exception { //定義鏈接工廠
         ConnectionFactory factory = new ConnectionFactory(); //設置服務地址
         factory.setHost("192.168.159.159"); //端口
         factory.setPort(5672); //設置帳號信息,用戶名、密碼、vhost
         factory.setVirtualHost("/new1"); factory.setUsername("/admin"); factory.setPassword("admin"); // 經過工程獲取鏈接
         Connection connection = factory.newConnection(); return connection; } }

生產者發送消息

package com.mq.start; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /**生產者**/
public class send { //肯定隊列的標識
    private final static String QUEUE_NAME = "simple_queue"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 從鏈接中建立通道,使用通道才能完成消息相關的操做
        Channel channel = connection.createChannel(); // 聲明(建立)隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容
        String message = "Hello World!"; // 向指定的隊列中發送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); ​ System.out.println(" [服務提供者] Send '" + message + "'"); ​ //關閉通道和鏈接
 channel.close(); connection.close(); } }

這個時候咱們切換到剛剛建立的用戶上 /admin 上查看信息:

  

消費者獲取消息:

package com.mq.start; ​ import com.rabbitmq.client.*; import java.io.IOException; ​ public class get { //隊列name 要達成通訊 必須和發送的隊列name 一致
    private final static String QUEUE_NAME = "simple_queue"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 建立通道
        Channel channel = connection.createChannel(); // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [服務消費者] get : " + msg + "!"); } }; // 監聽隊列,第二個參數:是否自動進行消息確認。
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

控制檯打印:由於我發送了兩次

 

再次查看Web管理頁面,沒有消息了:

消費者的消息確認機制

通過剛剛的小Demo,我能發現一旦消息從隊列中被消費者拉取消費後,隊列中的消息就會刪除,

這裏就涉及到一個MQ是經過消息確認機制知道消息什麼時候被消費,當消費者獲取到信息後,回想MQ返回一個ACK回執告知已被接受,能夠刪除。不過ACK回執分問兩種狀況:

  • 手動ACK:消息接收後,通常在消費者消費掉該消息後手動發送ACK

  • 自動ACK:消息接受後當即就會自動發送ACK

至於如何選擇:根據信息的重要程度區分

  • 消息不過重要,即便丟失影響也不大,自動ACK比較巴適

  • 消息很重要,不容許丟失,那就等咱們消費者消費完這個信息後手動發送回執

java實現:部分實現

DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String msg = new String(body); System.out.println(" [服務消費者] get1 : " + msg + "!"); //在消息消費完後,手動發送ACK回執給MQ
                channel.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列,第二個參數:是否自動進行消息確認。
        channel.basicConsume(QUEUE_NAME, false, consumer); }

消息模型—work消息模型 [ 任務模型 ]

當消息處理比較耗時的時候,可呢個生產消息的速度回遠遠大於消息的消費速度,隨着時間的推移,隊列中的消息就會堆積如山沒法及時的處理,此時work模型橫空出世,讓多個消費者綁定到一個隊列上,共同消費同一個隊列中的消息

  • 一個生產者,一個隊列,兩個或者更多的消費者

消息的生產者:連續發送50個消息去隊列

package com.mq.start.work模型; ​ import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ​ public class send {
private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循環發佈任務 for (int i = 0; i < 50; i++) { // 消息內容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); ​ Thread.sleep(i * 2); } // 關閉通道和鏈接 channel.close(); connection.close(); } }

兩個消費者:一次只能處理接收一個消息處理:

package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * get1消費者有Thread.sleep(1000),模擬更耗時 */
public class get1 { private final static String QUEUE_NAME = "test_work_queue"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        final Channel channe1 = connection.createChannel(); // 聲明隊列
        channe1.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channe1) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [消費者1] get : " + msg + "!"); // 手動ACK
                channe1.basicAck(envelope.getDeliveryTag(), false); try { //模擬這個消費者消費一個消息很耗時
                    Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 監聽隊列。
        channe1.basicConsume(QUEUE_NAME, false, consumer); }
package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * get2處理比價快 */
public class get2 { private final static String QUEUE_NAME = "test_work_queue"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        final Channel channe2 = connection.createChannel(); // 聲明隊列
        channe2.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channe2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [消費者2] get : " + msg + "!"); // 手動ACK
                channe2.basicAck(envelope.getDeliveryTag(), false); } }; // 監聽隊列。
        channe2.basicConsume(QUEUE_NAME, false, consumer); } }

舒適提示:優先啓動兩個消費者,隨後再啓動消息發佈者

而後咱們看下面的控制檯:get1慢吞吞的在消費,get2快速的消費完便在休息了,一人消費一半

  • 在上面這種狀況下,消費者get1的消費效率是要比消費者get2的效率要低的

  • 但是兩個消費者最終的消費消費的信息數量確實同樣的,是任務均分的;

  • 消費者get1一直在忙碌於消費,消費者get2處理完分配的一半後便處於空閒狀態

能者多勞

消費者同一時間只會接受一條消息,在處理完以前不會接新的消息,讓處理快的人接受更多的消息:

兩個消費者都修改設置以下:

// 設置每一個消費者同時只能處理一條消息
  channel.basicQos(1);

讓咱們看看效果如何:

消息模型—訂閱模型

示意圖:

  

  • P:生產者,發送消息給X(交換機)

  • Exchange:交換機,圖中的X。接收生產者發送的消息。知道如何處理消息,例如遞交給某個特別隊列、遞交給全部隊列、或是將消息丟棄。到底如何操做,取決於Exchange的類型。Exchange有如下3種類型:

    • Fanout:廣播,將消息交給全部綁定到交換機的隊列

    • Direct:定向,把消息交給符合指定routing key 的隊列

    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

  • Queue:消息隊列,接收消息、緩存消息。

  • C:消費者,消息的消費者,會一直等待消息到來。

注意:交換機只負責轉發消息,不具有消息儲存的能力,若是沒有隊列與其進行對接,消息會丟失

消息模型—訂閱模型—廣播 [ Fanout ]

流程圖:

  

在廣播模式下,消息發送流程是這樣的:

  • 1) 能夠有多個消費者

  • 2) 每一個消費者有本身對接的queue(隊列)

  • 3) 每一個隊列都要對接到Exchange(交換機)

  • 4) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪一個隊列,生產者沒法決定。

  • 5) 交換機把消息發送給綁定過的全部隊列

  • 6) 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費

生產者:

package com.mq.start.訂閱模型_廣播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ​ public class send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        Channel channel = connection.createChannel(); ​ // 聲明exchange,指定類型爲fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); ​ // 消息內容
        String message = "四川新聞廣播電視臺爲你播報:今天..."; // 發佈消息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生產者] Send '" + message + "'"); ​ channel.close(); connection.close(); } }

消費者1:

package com.mq.start.訂閱模型_廣播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get1 { private final static String QUEUE_NAME = "fanout_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        Channel channel = connection.createChannel(); // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); ​ // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [消費者1] received : " + msg + "!"); } }; // 監聽隊列,自動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

消費者2:

package com.mq.start.訂閱模型_廣播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get2 { private final static String QUEUE_NAME = "fanout_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        Channel channel = connection.createChannel(); // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 綁定隊列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); ​ // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [消費者2] received : " + msg + "!"); } }; // 監聽隊列,手動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

而後查看控制檯:一條消息被全部訂閱的隊列都消費

消息模型—訂閱模型— [ Direct ]

廣播是一條消息被全部與交換機對接的隊列都消費,但有時候,咱們想不一樣的信息被不一樣的隊列說消費,這是就要用到Direct類型的交換機

  

  • P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。

  • X:Exchange(交換機),接收生產者的消息,而後把消息遞交給 與routing key徹底匹配的隊列

  • C1:消費者,其所在隊列指定了須要routing key 爲 error 的消息

  • C2:消費者,其所在隊列指定了須要routing key 爲 info、error、warning 的消息

在Direct模式下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey

  • Exchange再也不把消息交給每個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key徹底一致,纔會接收到消息

消息生產者:

package com.mq.start.訂閱模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 咱們模擬商品的增刪改,發送消息的RoutingKey分別是:insert、update、delete */
public class send { private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容
        String message = "商品新增, id = 1001"; // 發送消息,而且指定routing key 爲:insert ,表明新增商品
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); System.out.println(" [商品服務:] Send '" + message + "'"); channel.close(); connection.close(); } }

記住咱們的roting key 是insert噢!

消息消費者1:get1 ,他能接受routing key 爲 "update"、"delete"的消息

package com.mq.start.訂閱模型_Direct; ​import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; ​import java.io.IOException; ​ public class get1 {
private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 綁定隊列到交換機,同時指定須要訂閱的routing key。假設此處須要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); ​ // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [消費者1] git : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

消息消費者2:get2,他能接受routing key 爲 "insert"、"update"、"delete" 的消息

package com.mq.start.訂閱模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        Channel channel = connection.createChannel(); // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 綁定隊列到交換機,同時指定須要訂閱的routing key。訂閱 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); ​ // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [消費者2] get : " + msg + "!"); } }; // 監聽隊列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

咱們分別設置routing key 爲 insert、update、delete,逐一測試:

其餘咱們就執行測試吧,我就測了routing key爲insert,最終被get2所消費

消息模型—訂閱模型— [ Topic ]

Topic類型yuDirect相比,其實差很少的,都是根據rounting key 把消息路由到不一樣的隊列,就是Topic類型的交換機支在匹配的時候支持rounting key的通配符

通配符規則:

#:匹配一個或多個詞

*:匹配很少很多剛好1個詞

舉例:

audit.#:可以匹配audit.irs.corporate或者 audit.irs

audit.*:只能匹配audit.irs

消息生產者:Rounting key 爲: item.insert / update / delete

package com.mq.start.訂閱模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ​ public class send {
private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明exchange,指定類型爲topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "新增商品 : id = 1001"; // 發送消息,而且指定routing key 爲:insert ,表明新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服務:] Send '" + message + "'"); ​ channel.close(); connection.close(); } }

消息消費者1:get1,匹配的Rounting Key爲 item.update / delete

package com.mq.start.訂閱模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get1 { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接
        Connection connection = connectionUtils.getConnection(); // 獲取通道
        Channel channel = connection.createChannel(); // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 綁定隊列到交換機,同時指定須要訂閱的routing key。須要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); ​ // 定義隊列的消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體
                String msg = new String(body); System.out.println(" [Get1] get : " + msg + "!"); } }; // 監聽隊列,自動ACK
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

消息消費者2:get2 ,經過通配符的方式,消費全部item打頭,後拼一個單詞的全部消息

package com.mq.start.訂閱模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; ​import java.io.IOException; ​ public class get2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 獲取到鏈接 Connection connection = connectionUtils.getConnection(); // 獲取通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 綁定隊列到交換機,同時指定須要訂閱的routing key。訂閱 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); ​ // 定義隊列的消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取消息,而且處理,這個方法相似事件監聽,若是有消息的時候,會被自動調用 ​ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息體 String msg = new String(body); System.out.println(" [get2] get : " + msg + "!"); } }; // 監聽隊列,自動ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

自測,效果一模一樣

如何避免消息丟失

手動ACK

消費者的手動ACK機制,可有效的避免消息的丟失

消息持久化

若想支持消息持久化,隊列和交換機都得持久化

交換機的持久化:

// 聲明exchange,指定類型爲topic,其後跟一個true參數目標是開啓交換機的持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);

隊列的持久化:

// 聲明隊列,第二個參數表示是否開啓隊列持久化
channe1.queueDeclare(QUEUE_NAME, false, false, false, null);

消息持久化:

// 發送消息,而且指定routing key 爲:insert ,第三個參數表示開啓信息持久化
channel.basicPublish(EXCHANGE_NAME, "item.update",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Spring AMQP

Spring-amqp是對AMQP協議的抽象實現,而spring-rabbit 是對協議的具體實現,也是目前的惟一實現。底層使用的就是RabbitMQ。

依賴和配置:

pom.xml

 <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

application.yml

spring:
  rabbitmq:
    host: 192.168.159.159
    username: /admin
    password: admin
    virtual-host: /new1
    template:                           #有關Template的配置
      retry:                            #失敗重試
        enabled: true                   #失敗重試_開啓失敗重試
        initial-interval: 10000ms       #失敗重試_第一次重試的間隔時長
        max-interval: 300000ms          #失敗重試_最長重試間隔
        multiplier: 2                   #失敗重試_下次重試間隔的倍速
      exchange: spring.test.exchange    #指定交換機,發送消息若不指定交換機就使用配置的交換機
    publisher-confirms: true            #生產者確認機制,確保消息正確發送,發送失敗會有錯誤回執

服務的監聽者:在SpringAMQP中,普通方法 + 註解,就能夠成爲一個消費者。

package com.mq.start.SpringAMQP; ​import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; ​ /** * 這是一個消費者 / 監聽者 */ @Component @RabbitListener(queues = "spring.test.queue" ) public class Listener { ​ /** * - @Componet`:類上的註解,註冊到Spring容器 * - `@RabbitListener`:方法上的註解,聲明這個方法是一個消費者方法,須要指定下面的屬性: * - `bindings`:指定綁定關係,能夠有多個。值是`@QueueBinding`的數組。`@QueueBinding`包含下面屬性: * - `value`:這個消費者關聯的隊列。值是`@Queue`,表明一個隊列 * - `exchange`:隊列所綁定的交換機,值是`@Exchange`類型 * - `key`:隊列和交換機綁定的`RoutingKey` */ ​ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"#.#"})) public void listen(String msg){ System.out.println("接收到消息:" + msg); } }

消息的發送者:AmqpTemplate

Spring爲AMQP提供了統一的消息處理模板:AmqpTemplate,很是方便的發送消息,其發送方法:

package com.mq.start.SpringAMQP; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; ​ @RunWith(SpringRunner.class) @SpringBootTest public class mqSend { ​ @Autowired private AmqpTemplate amqpTemplate; ​ @Test public void testSend() throws InterruptedException { String msg = "hello, Spring boot amqp"; this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg); // 等待10秒後再結束
        Thread.sleep(10000); } }

外加一個SpringBoot項目的啓動類:

package com.mq.start; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; ​ @SpringBootApplication public class Run { public static void main(String[] args) { SpringApplication.run(Run.class, args); } }

內容就這麼多,整合就算完成,咱們首先啓動SpringBoot項目,而後啓動測試類生產消息,消息的監聽者自會監聽到消息後處理:

相關文章
相關標籤/搜索