java基礎(六):RabbitMQ 入門

建議先了解爲何項目要使用 MQ 消息隊列,MQ 消息隊列有什麼優勢,若是在業務邏輯上沒有此種需求,建議不要使用中間件。中間件對系統的性能作優化的同時,同時增長了系統的複雜性也維護難易度;其次,須要瞭解各類常見的 MQ 消息隊列有什麼區別,以便在相同的成本下選擇一種最合適本系統的技術。html

本文主要討論 RabbitMQ,從3月底接觸一個項目使用了 RabbitMQ,就開始着手學習,主要經過視頻和博客學習了一個月,基本明白了 RabbitMQ 的應用,其它的 MQ 隊列還不清楚,其底層技術還有待學習,如下是我目前的學習心得。java

1.安裝 Erlang

RabbitMQ 是基於 Erlang 語言寫的,因此首先安裝 Erlang,本例是在 Windows 上安裝,也能夠選擇在 Linux 上安裝,機器上沒有虛擬機,直接在 Windows 上操做,建議在 Linux 上安裝。官方下載 Erlang 軟件,我下載最新版本 21.3。安裝過程很簡單,直接 Next 到底。 Linux 安裝自行谷歌。以下圖:spring

安裝結束後,設置環境變量,以下圖

測試是否安裝成功

2.安裝 RabbitMQ

官方下載,選擇最新版本 3.7。安裝過程很簡單,直接 Next 到底。以下圖:數據庫

測試安裝是否成功,進入安裝目錄 sbin,執行 rabbitmq-plugins enable rabbitmq_management 命令,出現下面界面,證實安裝成功(建議以管理員方式打開 dos)。

執行 rabbitmq-server start 命令,啓動服務。本地登錄並建立用戶,以下圖:apache

關於tags標籤的解釋:

一、  超級管理員(administrator)json

可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。windows

二、  監控者(monitoring)bash

可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)服務器

三、  策略制定者(policymaker)網絡

可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。

四、  普通管理者(management)

僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。

五、  其餘

沒法登錄管理控制檯,一般就是普通的生產者和消費者。

4.JAVA 操做RabbitMQ

參考 RabbitMQ 官網,一共分爲6個模式

RabbitMQ 是一個消息代理,實際上,它接收生產者產生的消息,而後將消息傳遞給消費者。在這個過程當中,它能夠路由、緩衝、持久化等,在傳輸過程當中,主要又三部分組成。

生產者:發送消息的一端

隊列:它活動在 RabbitMQ 服務器中,消息存儲的地方,隊列本質上是一個緩衝對象,因此存儲的消息不受限制

消費者:消息接收端

通常狀況下,消息生產者、消費者和隊列不在同一臺服務器上,本地作測試,放在一臺服務器上。 測試項目直接建立一個 maven 格式的項目,不必建立網絡格式。新建一個項目,以下圖:

首先準備操做 MQ 的環境

(1): 準備必要的 Pom 文件,導入相應的 jar 包,

<!--mq客戶端-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>4.5.0</version>
    </dependency>
    <!--日誌-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.25</version>
    </dependency>
    <!--工具包-->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.3.2</version>
    </dependency>
    <!--spring集成-->
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>1.7.6.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>4.3.7.RELEASE</version>
    </dependency>
複製代碼

(2): 創建日誌配置文件,在 resources 下創建 log4j.properties,便於打印精確的日誌信息

log4j.rootLogger=DEBUG,A1
log4j.logger.org.mybatis=DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
複製代碼

(3): 編寫一個工具類,主要用於鏈接 RabbitMQ

package com.edu.util;


import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @ClassName ConnectionUtil
 * @Deccription 穿件鏈接的工具類
 * @Author DZ
 * @Date 2019/5/4 12:27
 **/
public class ConnectionUtil {
    /**
     * 建立鏈接工具
     *
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");//MQ的服務器
        connectionFactory.setPort(5672);//默認端口號
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/test");
        return connectionFactory.newConnection();
    }
}

複製代碼

項目整體圖以下:

4.1.Hello World模式

此模式很是簡單,一個生產者對應一個消費者

首先咱們製造一個消息生產者,併發送消息:

package com.edu.hello;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName Sender
 * @Deccription 建立發送者
 * @Author DZ
 * @Date 2019/5/4 12:45
 **/
public class Sender {
    private final static String QUEUE = "testhello"; //隊列的名字

    public static void main(String[] srgs) throws Exception {
        //獲取鏈接
        Connection connection = ConnectionUtil.getConnection();
        //建立鏈接
        Channel channel = connection.createChannel();
        //聲明隊列
        //參數1:隊列的名字
        //參數2:是否持久化隊列,咱們的隊列存在內存中,若是mq重啓則丟失。若是爲ture,則保存在erlang的數據庫中,重啓,依舊保存
        //參數3:是否排外,咱們鏈接關閉後是否自動刪除隊列,是否私有當前隊列,若是私有,其餘隊列不能訪問
        //參數4:是否自動刪除
        //參數5:咱們傳入的其餘參數
        channel.queueDeclare(QUEUE, false, false, false, null);
        //發送內容
        channel.basicPublish("", QUEUE, null, "要發送的消息".getBytes());
        //關閉鏈接
        channel.close();
        connection.close();
    }
}

複製代碼

定義一個消息接受者

package com.edu.hello;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @ClassName Recver
 * @Deccription 消息接受者
 * @Author DZ
 * @Date 2019/5/4 12:58
 **/
public class Recver {
    private final static String QUEUE = "testhello";//消息隊列的名稱

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //接受消息,參數2表示自動確認消息
        channel.basicConsume(QUEUE, true, queueingConsumer);
        while (true) {
            //獲取消息
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//若是沒有消息就等待,有消息就獲取消息,並銷燬,是一次性的
            String message = new String(delivery.getBody());
            System.out.println(message);
        }
    }
}

複製代碼

此種模式屬於「點對點」模式,一個生產者、一個隊列、一個消費者,能夠運用在聊天室(實際上真實的聊天室比這複雜不少,雖然是「點對點」模式,可是並非一個生產者,一個隊列,一個消費者)

4.2.work queues

一個生產者對應多個消費者,可是隻有一個消費者得到消息

定義消息製造者:

package com.edu.work;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName Sender
 * @Deccription 建立發送者
 * @Author DZ
 * @Date 2019/5/4 12:45
 **/
public class Sender {
    private final static String QUEUE = "testhellowork"; //隊列的名字

    public static void main(String[] srgs) throws Exception {
        //獲取鏈接
        Connection connection = ConnectionUtil.getConnection();
        //建立鏈接
        Channel channel = connection.createChannel();
        //聲明隊列
        //參數1:隊列的名字
        //參數2:是否持久化隊列,咱們的隊列存在內存中,若是mq重啓則丟失。若是爲ture,則保存在erlang的數據庫中,重啓,依舊保存
        //參數3:是否排外,咱們鏈接關閉後是否自動刪除隊列,是否私有當前隊列,若是私有,其餘隊列不能訪問
        //參數4:是否自動刪除
        //參數5:咱們傳入的其餘參數
        channel.queueDeclare(QUEUE, false, false, false, null);
        //發送內容
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", QUEUE, null, ("要發送的消息" + i).getBytes());
        }
        //關閉鏈接
        channel.close();
        connection.close();
    }
}

複製代碼

定義2個消息消費者

package com.edu.work;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Queue;

/**
 * @ClassName Recver1
 * @Deccription 消息接受者
 * @Author DZ
 * @Date 2019/5/4 12:58
 **/
public class Recver1 {
    private final static String QUEUE = "testhellowork";//消息隊列的名稱

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //channel.basicQos(1);//告訴服務器,當前消息沒有確認以前,不要發送新消息,合理自動分配資源
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //收到消息時候調用
                System.out.println("消費者1收到的消息:" + new String(body));
                /*super.handleDelivery(consumerTag, envelope, properties, body);*/
                //確認消息
                //參數2:false爲確認收到消息,ture爲拒絕收到消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //註冊消費者
        // 參數2:手動確認,咱們收到消息後,須要手動確認,告訴服務器,咱們收到消息了
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼
package com.edu.work;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription 消息接受者
 * @Author DZ
 * @Date 2019/5/4 12:58
 **/
public class Recver2 {
    private final static String QUEUE = "testhellowork";//消息隊列的名稱

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //channel.basicQos(1);//告訴服務器,當前消息沒有確認以前,不要發送新消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //收到消息時候調用
                System.out.println("消費者2收到的消息:" + new String(body));
                /*super.handleDelivery(consumerTag, envelope, properties, body);*/
                //確認消息
                //參數2:false爲確認收到消息,ture爲拒絕收到消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //註冊消費者
        // 參數2:手動確認,咱們收到消息後,須要手動確認,告訴服務器,咱們收到消息了
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼

這種模式是最簡單的 work 模式,消息發送者,循環發送了100次消息,打印結果以下:

能夠看出,消息消費者消費到的消息是替換的,即 一個消息只被消費了一次,且兩個消費者各消費了50條消息。這裏有個弊端,消息消費者發佈消息的時候, 不管消費者的消費能力如何(電腦的內存等硬件),消息只會均勻分佈給各個消費者(能夠給2個消費者 sleep 下,結果仍是這樣)。有沒有什麼方式可讓消息自動分配(按照電腦的硬件,能者多勞),答案是能夠的,只須要增長 channel.basicQos(1);

此方案能夠用來進行負載均衡,搶紅包等場景

4.3.public模式

一個消費者將消息首先發送到交換器,交換器綁定到多個隊列,而後被監聽該隊列的消費者所接收並消費。X 表示交換器,在 RabbitMQ 中,交換器主要有四種類型: direct、fanout、topic、headers,這裏的交換器是 fanout,其它類型的交換機自行谷歌,主要區別是 交換機的匹配方式發生了變化

定義消息發佈者

package com.edu.publish;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName Sender
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:43
 **/
public class Sender {
    private final static String EXCHANGE_NAME = "testexchange";//定義交換機名字

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //聲明交換機
        //定義一個交換機,類型爲fanout,也就是發佈訂閱者模式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //發佈訂閱模式,由於消息是先發布到交換機中,而交換機是沒有保存功能的,因此若是沒有消費者,消息會丟失
        channel.basicPublish(EXCHANGE_NAME, "", null, "發佈訂閱模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

複製代碼

定義2個消息消費者

package com.edu.publish;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:49
 **/
public class Recver1 {
    //定義交換機
    private final static String EXCHANGE_NAME = "testexchange";
    private final static String QUEUE = "testpubqueue1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //綁定隊列到交換機
        channel.queueBind(QUEUE, EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /* super.handleDelivery(consumerTag, envelope, properties, body);*/
                System.out.println("消費者1:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼
package com.edu.publish;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:49
 **/
public class Recver2 {
    //定義交換機
    private final static String EXCHANGE_NAME = "testexchange";
    private final static String QUEUE = "testpubqueue2";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //綁定隊列到交換機
        channel.queueBind(QUEUE, EXCHANGE_NAME, "");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /* super.handleDelivery(consumerTag, envelope, properties, body);*/
                System.out.println("消費者2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼

消費者1 和消費者2 都監聽了被同一個交換器綁定的隊列,所以消息被同時消費到了。若是消息發送到沒有隊列綁定的交換器時,消息將丟失,由於交換器沒有存儲消息的能力,消息只能存儲在隊列中。

應用場景:好比一個商城系統須要在管理員上傳商品新的圖片時,前臺系統必須更新圖片,日誌系統必須記錄相應的日誌,那麼就能夠將兩個隊列綁定到圖片上傳交換器上,一個用於前臺系統更新圖片,另外一個用於日誌系統記錄日誌。

4.4.routing

生產者將消息發送到 direct 交換器,在綁定隊列和交換器的時候有一個路由 key,生產者發送的消息會指定一個路由 key,那麼消息只會發送到相應 key 相同的隊列,接着監聽該隊列的消費者消費消息。

定義消息發佈者

package com.edu.route;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName Sender
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 15:05
 **/
public class Sender {
    private final static String EXCANGE_NAME = "testroute";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //定義路由格式的交換機
        channel.exchangeDeclare(EXCANGE_NAME, "direct");
        channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes());
        channel.close();
        connection.close();
    }
}

複製代碼
package com.edu.route;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:49
 **/
public class Recver1 {
    //定義交換機
    private final static String EXCHANGE_NAME = "testroute";
    private final static String QUEUE = "testroute1queue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //綁定隊列到交換機
        //參數3:綁定到交換機指定的路由的名字
        channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
        //若是須要綁定多個路由,再綁定一次便可
        channel.queueBind(QUEUE, EXCHANGE_NAME, "key2");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /* super.handleDelivery(consumerTag, envelope, properties, body);*/
                System.out.println("消費者1:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼
package com.edu.route;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:49
 **/
public class Recver2 {
    //定義交換機
    private final static String EXCHANGE_NAME = "testroute";
    private final static String QUEUE = "testroute2queue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //綁定隊列到交換機
        //參數3:綁定到交換機指定的路由的名字
        channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
        //若是須要綁定多個路由,再綁定一次便可
        channel.queueBind(QUEUE, EXCHANGE_NAME, "key3");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /* super.handleDelivery(consumerTag, envelope, properties, body);*/
                System.out.println("消費者2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼

應用場景:利用消費者可以有選擇性的接收消息的特性,好比咱們商城系統的後臺管理系統對於商品進行修改、刪除、新增操做都須要更新前臺系統的界面展現,而查詢操做確不須要,那麼這兩個隊列分開接收消息就比較好。

4.5.Topic

上面的路由模式是根據路由key進行完整的匹配(徹底相等才發送消息),這裏的通配符模式通俗的來說就是模糊匹配。符號 「#」 表示匹配一個或多個詞,符號 「*」 表示匹配一個詞。實際上 Topic 模式是 routing 模式的擴展

package com.edu.topic;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName Sender
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 15:19
 **/
public class Sender {
    private final static String EXCANGE_NAME = "testtopexchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCANGE_NAME, "topic");
        channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息發送者:".getBytes());
        channel.close();
        connection.close();
    }
}

複製代碼
package com.edu.topic;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:49
 **/
public class Recver1 {
    //定義交換機
    private final static String EXCHANGE_NAME = "testtopexchange";
    private final static String QUEUE = "testtopic1queue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //綁定隊列到交換機
        //參數3:綁定到交換機指定的路由的名字
        channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
        //若是須要綁定多個路由,再綁定一次便可
        channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /* super.handleDelivery(consumerTag, envelope, properties, body);*/
                System.out.println("消費者1:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼
package com.edu.topic;

import com.edu.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName Recver1
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 14:49
 **/
public class Recver2 {
    //定義交換機
    private final static String EXCHANGE_NAME = "testtopexchange";
    private final static String QUEUE = "testtopic2queue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE, false, false, false, null);
        //綁定隊列到交換機
        //參數3:綁定到交換機指定的路由的名字
        channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
        //若是須要綁定多個路由,再綁定一次便可
        channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#");
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /* super.handleDelivery(consumerTag, envelope, properties, body);*/
                System.out.println("消費者2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE, false, defaultConsumer);
    }
}

複製代碼

第六種模式是將上述的模式集成其它的框架,進行遠程訪問,這裏咱們將集成 Spring 實現 RCP 遠程模式的使用

5.Spring 集成 RabbitMQ

5.1.自動集成 Spring

編寫spring的配置,此配置文件的目的是將 Spring 與 RabbitMQ 進行整合,實際上就是將 MQ 的相關信息(鏈接,隊列,交換機……)經過XML配置的方式實現

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
    <!--定義鏈接工廠-->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
                               virtual-host="/test"/>
    <!--
     定義模板
     第三個參數,決定消息發送到哪裏,若是爲exchange,則發送到交換機;若是爲queue,則發送到隊列
    -->
    <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--定義隊列-->
    <rabbit:queue name="myQueue" auto-declare="true"/>
    <!--定義交換機-->
    <rabbit:fanout-exchange name="fanoutExange" auto-declare="true">
        <!--將消息綁定到交換機-->
        <rabbit:bindings>
            <rabbit:binding queue="myQueue">

            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    <!--定義監聽器,收到消息會執行-->
    <rabbit:listener-container connection-factory="connectionFactory">
       <!-- 定義監聽的類和方法-->
        <rabbit:listener ref="consumer" method="test" queue-names="myQueue"/>
    </rabbit:listener-container>
    <!--定義消費者-->
    <bean id="consumer" class="com.edu.spring.MyConsumer"/>

</beans>
複製代碼

生產者:

package com.edu.spring;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @ClassName SpringTest
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 18:40
 **/
public class SpringTest {
    public static void main(String[] args) throws Exception {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.convertAndSend("Spring的消息");
        ((ClassPathXmlApplicationContext) applicationContext).destroy();
    }
}

複製代碼

消費者

package com.edu.spring;

/**
 * @ClassName MyConsumer
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 18:35
 **/
public class MyConsumer {
    /*用於接收消息*/
    public void test(String message) {
        System.err.println(message);
    }
}

複製代碼

集成Spring主要是在xml中實現了隊列和交換機的建立。

最好能理解上面的圖。理解後,之後寫相關的代碼,直接去網上 copy 一份配置文件,而後根據本身項目的狀況進行修改。若是不能理解,就不知道如何修改出現錯誤後不知道錯誤出如今什麼地方。

5.2.手動模式

手動模式,主要增長MQ的回調操做,MQ消息失敗或者成功就有相應的回調信息,加強系統的健壯性,一旦產生異常,很快就能定位到異常的位置,因此在實際開發中,通常都這種方式

建立xml配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
    <context:component-scan base-package="com.edu"/>
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!--
    定義鏈接工廠
    publisher-confirms爲ture,確認失敗等回調纔會執行
    -->
    <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
                               virtual-host="/test" publisher-confirms="true"/>

    <rabbit:admin connection-factory="connectionFactory"/>
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener"
                     mandatory="true"/>
    <!--定義隊列-->
    <rabbit:queue name="myQueue" auto-declare="true"/>
    <!--定義交換機-->
    <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX">
        <!--將消息綁定到交換機-->
        <rabbit:bindings>
            <rabbit:binding queue="myQueue">

            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!--定義監聽器,收到消息會執行-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <!-- 定義監聽的類和方法-->
        <rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/>
    </rabbit:listener-container>

</beans>
複製代碼

建立回調監聽函數

package com.edu.spring2;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;

/**
 * @ClassName ConfirmCallBackListener
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 22:26
 **/
@Component("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("確認回調 ack==" + ack + "回調緣由==" + cause);
    }
}

複製代碼
package com.edu.spring2;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName ReceiveConfirmTestListener
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 22:24
 **/
@Component("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
    /**
     * 收到消息時,執行的監聽
     *
     * @param message
     * @param channel
     * @throws Exception
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println(("消費者收到了消息" + message));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

複製代碼
package com.edu.spring2;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * @ClassName ReturnCallBackListener
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 22:28
 **/
@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("失敗回調" + message);
    }
}

複製代碼

回調函數的配置來自 XML

建立發送消息的工具類

package com.edu.spring2;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @ClassName PublicUtil
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 22:30
 **/
@Component("publicUtil")
public class PublicUtil {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String excange, String routingkey, Object message) {
        amqpTemplate.convertAndSend(excange, routingkey, message);
    }
}

複製代碼

建立測試類

package com.edu.spring2;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @ClassName TestMain
 * @Deccription TODO
 * @Author DZ
 * @Date 2019/5/4 22:32
 **/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext2.xml"})
public class TestMain {
    @Autowired
    private PublicUtil publicUtil;
    private static String exChange = "DIRECT_EX";//交換機
    private static String queue = "myQueue";

    /**
     * exChange和queue均正確
     * confirm會執行,ack = ture
     * 消息正常接收(接收消息確認方法正常執行)
     * @throws Exception
     */
    @Test
    public void test1() throws Exception {
        publicUtil.send(exChange, queue, "測試1,隊列和交換機均正確");
    }
    /**
     * exChange錯誤,queue正確
     * confirm執行,ack=false
     * 消息沒法接收(接收消息確認方法不能執行)
     * @throws Exception
     */
    @Test
    public void test2() throws Exception {
        publicUtil.send(exChange + "1", queue, "測試2,隊列正確,交換機錯誤");
    }
    /**
     * exChange正常,queue錯誤
     * return執行
     * confirm執行,ack=ture
     * @throws Exception
     */
    @Test
    public void test3() throws Exception {
        publicUtil.send(exChange, queue + "1", "測試2,隊列錯誤,交換機正確");
    }

    /**
     * exChange錯誤,queue錯誤
     * confirm執行,ack=false
     * @throws Exception
     */
    @Test
    public void test4() throws Exception {
        publicUtil.send(exChange + "1", queue + "1", "測試2,隊列錯誤,交換機錯誤");
    }
}


複製代碼

測試結果以下:

  • test1:exChange和queue均正確

    confirm會執行,ack=ture;能正常收到消息(接收消息的方法正常執行)

  • test2:exChange錯誤,queue正確

confirm執行,ack=false;不能正常接收到消息

  • test3:exChange正確,queue錯誤

confirm執行,ack=ture;return執行;不能接收到消息

  • test4:exChange和queue均錯誤

confirm執行,ack=false;不能接收消息

上述結論及代碼以下圖:

根據上述的測試結果,咱們能夠根據回調函數的返回結果,查看MQ的錯誤出如今那裏。根據上述結論,咱們能夠對3個回調函數作以下處理:

  • 類 ReceiveConfirmTestListener 中的onMessage方法主要用於接收從 RabbitMQ 推送過來的消息,並對消息作相應的邏輯處理

  • 類 ConfirmCallBackListener 中的 confirm 方法主要用於檢查交換機(exChange),當 ack=false,交換機可能錯誤

  • 類 ReturnCallBackListener 中的 returnedMessage 方法用於檢查隊列(queue),當此方法執行時,隊列可能錯誤

因此3個相應的方法能夠作以下調整:

實際上,在真實項目中,上面3個方法也是按照這3個邏輯進行設計的。固然這3個方法中還能夠加入更多的日誌消息,和邏輯處理業務。

6.參考

blog.csdn.net/liu911025/a…

blog.csdn.net/lyhkmm/arti…

blog.csdn.net/vbirdbest/a…

blog.csdn.net/cairuojin/a…

www.rabbitmq.com/getstarted.…

相關文章
相關標籤/搜索