第四十二章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息多消費者消費

在上一章第四十一章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息消費咱們講解到了RabbitMQ消息隊列的DirectExchange路由鍵消息單個消費者消費,源碼請訪問SpringBoot對應章節源碼下載查看,消息隊列目的是完成消息的分佈式消費,那麼咱們是否能夠爲一個Provider建立並綁定多個Consumer呢?node

本章目標

基於SpringBoot平臺整合RabbitMQ消息隊列,完成一個Provider綁定多個Consumer進行消息消費。git

SpringBoot 企業級核心技術學習專題

專題 專題名稱 專題描述
001 Spring Boot 核心技術 講解SpringBoot一些企業級層面的核心組件
002 Spring Boot 核心技術章節源碼 Spring Boot 核心技術簡書每一篇文章碼雲對應源碼
003 Spring Cloud 核心技術 對Spring Cloud核心技術全面講解
004 Spring Cloud 核心技術章節源碼 Spring Cloud 核心技術簡書每一篇文章對應源碼
005 QueryDSL 核心技術 全面講解QueryDSL核心技術以及基於SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技術 全面講解SpringDataJPA核心技術

構建項目

咱們基於上一章的項目進行升級,咱們先來將Chapter41項目Copy一份命名爲Chapter42spring

構建 rabbitmq-consumer-node2

基於咱們複製的Chapter42項目,建立一個Module子項目命名爲rabbitmq-consumer-node2,用於消費者的第二個節點,接下來咱們爲rabbitmq-consumer-node2項目建立一個入口啓動類RabbitmqConsumerNode2Application,代碼以下所示:數據庫

/**
 * 消息隊列消息消費者節點2入口
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerNode2Application
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerNode2Application.class);

    /**
     * rabbitmq消費者啓動入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerNode2Application.class,args);

        logger.info("【【【【【消息隊列-消息消費者節點2啓動成功.】】】】】");
    }
}
複製代碼

爲了區分具體的消費者節點,咱們在項目啓動成功後打印了相關的日誌信息,下面咱們來編寫application.properties配置文件信息,能夠直接從rabbitmq-consumer子項目內複製內容,複製後須要修改server.port以及spring.application.name,以下所示:bash

#端口號
server.port=1112
#項目名稱
spring.application.name=rabbitmq-consumer-node2


#rabbitmq相關配置
#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#服務器ip
spring.rabbitmq.host=localhost
#虛擬空間地址
spring.rabbitmq.virtual-host=/
#端口號
spring.rabbitmq.port=5672
#配置發佈消息確認回調
spring.rabbitmq.publisher-confirms=true
複製代碼

由於咱們是本地測試項目,因此須要修改對應的端口號,防止端口被佔用。服務器

建立用戶註冊消費者

複製rabbitmq-consumer子項目內的UserConsumer類到rabbitmq-consumer-node2子項目對應的package內,以下所示:app

/**
 * 用戶註冊消息消費者
 * 分佈式節點2
 * ========================
 *
 * @author 恆宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 碼雲:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    /**
     * logback
     */
    private Logger logger = LoggerFactory.getLogger(UserConsumer.class);

    @RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用戶註冊消費者【節點2】獲取消息,用戶編號:{}",userId);

        //...//自行業務邏輯處理
    }
}
複製代碼

爲了區分具體的消費者輸出內容,咱們在上面UserConsumer消費者消費方法內打印了相關日誌輸出,下面咱們一樣把rabbitmq-consumer子項目內UserConsumer的消費方法寫入相關日誌,以下所示:框架

@RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用戶註冊消費者【節點1】獲取消息,用戶編號:{}",userId);

        //...//自行業務邏輯處理
    }
複製代碼

到目前爲止咱們的多節點RabbitMQ消費者已經編寫完成,下面咱們來模擬多個用戶註冊的場景,來查看用戶註冊消息是否被轉發並惟一性的分配給不一樣的消費者節點。分佈式

運行測試

咱們打開上一章編寫的UserTester測試類,爲了模擬多用戶註冊請求,咱們對應的建立一個內部線程類BatchRabbitTester,在線程類內編寫註冊請求代碼,以下所示:ide

/**
     * 批量添加用戶線程測試類
     * run方法發送用戶註冊請求
     */
    class BatchRabbitTester implements Runnable
    {
        private int index;
        public BatchRabbitTester() { }

        public BatchRabbitTester(int index) {
            this.index = index;
        }


        @Override
        public void run() {
            try {
                mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                        .param("userName","yuqiyu" + index)
                        .param("name","恆宇少年" + index)
                        .param("age","23")
                )
                        .andDo(MockMvcResultHandlers.log())
                        .andReturn();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
複製代碼

爲了區分每個註冊信息是否都已經寫入到數據庫,咱們爲BatchRabbitTester添加了一個有參的構造方法,將for循環的i值對應的傳遞爲index的值。下面咱們來編寫對應的批量註冊的測試方法,以下所示:

/**
     * 測試用戶批量添加
     * @throws Exception
     */
    @Test
    public void testBatchUserAdd() throws Exception
    {
        for (int i = 0 ; i < 10 ; i++) {
            //建立用戶註冊線程
            Thread thread = new Thread(new BatchRabbitTester(i));
            //啓動線程
            thread.start();
        }
        //等待線程執行完成
        Thread.sleep(2000);
    }
複製代碼

咱們循環10次來測試用戶註冊請求,每一次都會建立一個線程去完成發送註冊請求邏輯,在方法底部添加了sleep方法,目的是爲了阻塞測試用例的結束,由於咱們測試用戶完成方法後會自動中止,不會去等待其餘線程執行完成,因此這裏咱們阻塞測試主線程來完成發送註冊線程請求邏輯。

執行批量註冊測試方法

咱們在執行測試批量註冊用戶消息以前,先把rabbitmq-consumerrabbitmq-consumer-node2兩個消費者子項目啓動,項目啓動完成後能夠看到控制檯輸出啓動成功日誌,以下所示:

rabbitmq-consumer:
2017-12-10 17:10:36.961  INFO 15644 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.405 seconds (JVM running for 3.39)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息隊列-消息消費者啓動成功.】】】】】

rabbitmq-consumer-node2:
2017-12-10 17:11:31.679  INFO 13812 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1112 (http)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : Started RabbitmqConsumerNode2Application in 2.419 seconds (JVM running for 3.129)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息隊列-消息消費者節點2啓動成功.】】】】】

複製代碼

接下來咱們來運行testBatchUserAdd方法,查看測試控制檯輸出內容以下所示:

2017-12-10 17:15:02.619  INFO 14456 --- [       Thread-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#528df369:0/SimpleConnection@39b6ba57 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60936]
 回調id:194b5e67-6913-474a-b2ac-6e938e1e85e8
消息發送成功
 回調id:e88ce59c-3eb9-433c-9e25-9429e7076fbe
消息發送成功
 回調id:3e5b8382-6f63-450f-a641-e3d8eee255b2
消息發送成功
 回調id:39103357-6c80-4561-acb7-79b32d6171c9
消息發送成功
 回調id:9795d227-b54e-4cde-9993-a5b880fcfe39
消息發送成功
 回調id:e9b8b828-f069-455f-a366-380bf10a5909
消息發送成功
 回調id:6b5b4a9c-5e7f-4c53-9eef-98e06f8be867
消息發送成功
 回調id:619a42f3-cb94-4434-9c75-1e28a04ce350
消息發送成功
 回調id:6b720465-b64a-4ed9-9d8c-3e4dafa4faed
消息發送成功
 回調id:b4296f7f-98cc-423b-a4ef-0fc31d22cb08
消息發送成功
複製代碼

能夠看到確實已經成功的發送了10條用戶註冊消息到RabbitMQ服務端,那麼是否已經正確的成功的將消息轉發到消費者監聽方法了呢?咱們來打開rabbitmq-consumer子項目的啓動控制檯查看日誌輸出內容以下所示:

2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息隊列-消息消費者啓動成功.】】】】】
2017-12-10 17:15:02.695  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用戶註冊消費者【節點1】獲取消息,用戶編號:20
2017-12-10 17:15:02.718  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用戶註冊消費者【節點1】獲取消息,用戶編號:22
2017-12-10 17:15:02.726  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用戶註冊消費者【節點1】獲取消息,用戶編號:26
2017-12-10 17:15:02.729  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用戶註冊消費者【節點1】獲取消息,用戶編號:21
2017-12-10 17:15:02.789  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用戶註冊消費者【節點1】獲取消息,用戶編號:28
複製代碼

能夠看到成功的接受了5條對應用戶註冊消息內容,不過這裏具體接受的條數並非固定的,這也是RabbitMQ消息轉發權重內部問題。 下面咱們打開rabbitmq-consumer-node2子項目控制檯查看日誌輸出內容以下所示:

2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息隊列-消息消費者節點2啓動成功.】】】】】
2017-12-10 17:15:02.708  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用戶註冊消費者【節點2】獲取消息,用戶編號:25
2017-12-10 17:15:02.717  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用戶註冊消費者【節點2】獲取消息,用戶編號:23
2017-12-10 17:15:02.719  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用戶註冊消費者【節點2】獲取消息,用戶編號:24
2017-12-10 17:15:02.727  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用戶註冊消費者【節點2】獲取消息,用戶編號:27
2017-12-10 17:15:02.790  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用戶註冊消費者【節點2】獲取消息,用戶編號:29
複製代碼

一樣得到了5條用戶註冊消息,不過並無任何規律可言,編號也不是順序的。

因此多節點時消息具體分發到哪一個節點並非固定的,徹底是RabbitMQ分發機制來控制。

總結

本章完成了基於SpringBoot平臺整合RabbitMQ單個Provider對應綁定多個Consumer來進行多節點分佈式消費者消息消費,實際生產項目部署時徹底能夠將消費節點分開到不一樣的服務器,只要消費節點能夠訪問到RabbitMQ服務端,能夠正常通信,就能夠完成消息消費。

本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄,感謝閱讀! 歡迎加入QQ技術交流羣,共同進步。

QQ技術交流羣
相關文章
相關標籤/搜索