在上一章第四十一章: 基於SpringBoot & RabbitMQ完成DirectExchange分佈式消息消費咱們講解到了RabbitMQ
消息隊列的DirectExchange
路由鍵消息單個消費者消費,源碼請訪問SpringBoot對應章節源碼下載查看,消息隊列目的是完成消息的分佈式消費,那麼咱們是否能夠爲一個Provider
建立並綁定多個Consumer
呢?node
基於SpringBoot
平臺整合RabbitMQ
消息隊列,完成一個Provider
綁定多個Consumer
進行消息消費。git
專題 | 專題名稱 | 專題描述 |
---|---|---|
001 | Spring Boot 核心技術 | 講解SpringBoot一些企業級層面的核心組件 |
002 | Spring Boot 核心技術章節源碼 | Spring Boot 核心技術簡書每一篇文章碼雲對應源碼 |
003 | Spring Cloud 核心技術 | 對Spring Cloud核心技術全面講解 |
004 | Spring Cloud 核心技術章節源碼 | Spring Cloud 核心技術簡書每一篇文章對應源碼 |
005 | QueryDSL 核心技術 | 全面講解QueryDSL核心技術以及基於SpringBoot整合SpringDataJPA |
006 | SpringDataJPA 核心技術 | 全面講解SpringDataJPA核心技術 |
咱們基於上一章的項目進行升級,咱們先來將Chapter41
項目Copy
一份命名爲Chapter42
。spring
基於咱們複製的Chapter42
項目,建立一個Module
子項目命名爲rabbitmq-consumer-node2
,用於消費者的第二個節點,接下來咱們爲rabbitmq-consumer-node2
項目建立一個入口啓動類RabbitmqConsumerNode2Application
,代碼以下所示:數據庫
/**
* 消息隊列消息消費者節點2入口
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:15:15
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@SpringBootApplication
public class RabbitmqConsumerNode2Application
{
static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerNode2Application.class);
/**
* rabbitmq消費者啓動入口
* @param args
*/
public static void main(String[] args)
{
SpringApplication.run(RabbitmqConsumerNode2Application.class,args);
logger.info("【【【【【消息隊列-消息消費者節點2啓動成功.】】】】】");
}
}
複製代碼
爲了區分具體的消費者節點,咱們在項目啓動成功後打印了相關的日誌信息,下面咱們來編寫application.properties
配置文件信息,能夠直接從rabbitmq-consumer
子項目內複製內容,複製後須要修改server.port
以及spring.application.name
,以下所示:bash
#端口號
server.port=1112
#項目名稱
spring.application.name=rabbitmq-consumer-node2
#rabbitmq相關配置
#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#服務器ip
spring.rabbitmq.host=localhost
#虛擬空間地址
spring.rabbitmq.virtual-host=/
#端口號
spring.rabbitmq.port=5672
#配置發佈消息確認回調
spring.rabbitmq.publisher-confirms=true
複製代碼
由於咱們是本地測試項目,因此須要修改對應的端口號,防止端口被佔用。服務器
複製rabbitmq-consumer
子項目內的UserConsumer
類到rabbitmq-consumer-node2
子項目對應的package
內,以下所示:app
/**
* 用戶註冊消息消費者
* 分佈式節點2
* ========================
*
* @author 恆宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:15:20
* 碼雲:http://git.oschina.net/jnyqy
* ========================
*/
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {
/**
* logback
*/
private Logger logger = LoggerFactory.getLogger(UserConsumer.class);
@RabbitHandler
public void execute(Long userId)
{
logger.info("用戶註冊消費者【節點2】獲取消息,用戶編號:{}",userId);
//...//自行業務邏輯處理
}
}
複製代碼
爲了區分具體的消費者輸出內容,咱們在上面UserConsumer
消費者消費方法內打印了相關日誌輸出,下面咱們一樣把rabbitmq-consumer
子項目內UserConsumer
的消費方法寫入相關日誌,以下所示:框架
@RabbitHandler
public void execute(Long userId)
{
logger.info("用戶註冊消費者【節點1】獲取消息,用戶編號:{}",userId);
//...//自行業務邏輯處理
}
複製代碼
到目前爲止咱們的多節點
RabbitMQ
消費者已經編寫完成,下面咱們來模擬多個用戶註冊的場景,來查看用戶註冊消息是否被轉發並惟一性的分配給不一樣的消費者節點。分佈式
咱們打開上一章編寫的UserTester
測試類,爲了模擬多用戶註冊請求,咱們對應的建立一個內部線程類BatchRabbitTester
,在線程類內編寫註冊請求代碼,以下所示:ide
/**
* 批量添加用戶線程測試類
* run方法發送用戶註冊請求
*/
class BatchRabbitTester implements Runnable
{
private int index;
public BatchRabbitTester() { }
public BatchRabbitTester(int index) {
this.index = index;
}
@Override
public void run() {
try {
mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
.param("userName","yuqiyu" + index)
.param("name","恆宇少年" + index)
.param("age","23")
)
.andDo(MockMvcResultHandlers.log())
.andReturn();
}catch (Exception e){
e.printStackTrace();
}
}
}
複製代碼
爲了區分每個註冊信息是否都已經寫入到數據庫,咱們爲BatchRabbitTester
添加了一個有參的構造方法,將for循環的i
值對應的傳遞爲index
的值。下面咱們來編寫對應的批量註冊的測試方法,以下所示:
/**
* 測試用戶批量添加
* @throws Exception
*/
@Test
public void testBatchUserAdd() throws Exception
{
for (int i = 0 ; i < 10 ; i++) {
//建立用戶註冊線程
Thread thread = new Thread(new BatchRabbitTester(i));
//啓動線程
thread.start();
}
//等待線程執行完成
Thread.sleep(2000);
}
複製代碼
咱們循環10次來測試用戶註冊請求,每一次都會建立一個線程去完成發送註冊請求邏輯,在方法底部添加了sleep
方法,目的是爲了阻塞測試用例的結束,由於咱們測試用戶完成方法後會自動中止,不會去等待其餘線程執行完成,因此這裏咱們阻塞測試主線程來完成發送註冊線程請求邏輯。
咱們在執行測試批量註冊用戶消息以前,先把rabbitmq-consumer
、rabbitmq-consumer-node2
兩個消費者子項目啓動,項目啓動完成後能夠看到控制檯輸出啓動成功日誌,以下所示:
rabbitmq-consumer:
2017-12-10 17:10:36.961 INFO 15644 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-10 17:10:36.964 INFO 15644 --- [ main] c.h.r.c.RabbitmqConsumerApplication : Started RabbitmqConsumerApplication in 2.405 seconds (JVM running for 3.39)
2017-12-10 17:10:36.964 INFO 15644 --- [ main] c.h.r.c.RabbitmqConsumerApplication : 【【【【【消息隊列-消息消費者啓動成功.】】】】】
rabbitmq-consumer-node2:
2017-12-10 17:11:31.679 INFO 13812 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1112 (http)
2017-12-10 17:11:31.682 INFO 13812 --- [ main] c.h.c.RabbitmqConsumerNode2Application : Started RabbitmqConsumerNode2Application in 2.419 seconds (JVM running for 3.129)
2017-12-10 17:11:31.682 INFO 13812 --- [ main] c.h.c.RabbitmqConsumerNode2Application : 【【【【【消息隊列-消息消費者節點2啓動成功.】】】】】
複製代碼
接下來咱們來運行testBatchUserAdd
方法,查看測試控制檯輸出內容以下所示:
2017-12-10 17:15:02.619 INFO 14456 --- [ Thread-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#528df369:0/SimpleConnection@39b6ba57 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60936]
回調id:194b5e67-6913-474a-b2ac-6e938e1e85e8
消息發送成功
回調id:e88ce59c-3eb9-433c-9e25-9429e7076fbe
消息發送成功
回調id:3e5b8382-6f63-450f-a641-e3d8eee255b2
消息發送成功
回調id:39103357-6c80-4561-acb7-79b32d6171c9
消息發送成功
回調id:9795d227-b54e-4cde-9993-a5b880fcfe39
消息發送成功
回調id:e9b8b828-f069-455f-a366-380bf10a5909
消息發送成功
回調id:6b5b4a9c-5e7f-4c53-9eef-98e06f8be867
消息發送成功
回調id:619a42f3-cb94-4434-9c75-1e28a04ce350
消息發送成功
回調id:6b720465-b64a-4ed9-9d8c-3e4dafa4faed
消息發送成功
回調id:b4296f7f-98cc-423b-a4ef-0fc31d22cb08
消息發送成功
複製代碼
能夠看到確實已經成功的發送了10條用戶註冊消息到RabbitMQ
服務端,那麼是否已經正確的成功的將消息轉發到消費者監聽方法了呢?咱們來打開rabbitmq-consumer
子項目的啓動控制檯查看日誌輸出內容以下所示:
2017-12-10 17:10:36.964 INFO 15644 --- [ main] c.h.r.c.RabbitmqConsumerApplication : 【【【【【消息隊列-消息消費者啓動成功.】】】】】
2017-12-10 17:15:02.695 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用戶註冊消費者【節點1】獲取消息,用戶編號:20
2017-12-10 17:15:02.718 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用戶註冊消費者【節點1】獲取消息,用戶編號:22
2017-12-10 17:15:02.726 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用戶註冊消費者【節點1】獲取消息,用戶編號:26
2017-12-10 17:15:02.729 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用戶註冊消費者【節點1】獲取消息,用戶編號:21
2017-12-10 17:15:02.789 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用戶註冊消費者【節點1】獲取消息,用戶編號:28
複製代碼
能夠看到成功的接受了5
條對應用戶註冊消息內容,不過這裏具體接受的條數並非固定的,這也是RabbitMQ
消息轉發權重內部問題。 下面咱們打開rabbitmq-consumer-node2
子項目控制檯查看日誌輸出內容以下所示:
2017-12-10 17:11:31.682 INFO 13812 --- [ main] c.h.c.RabbitmqConsumerNode2Application : 【【【【【消息隊列-消息消費者節點2啓動成功.】】】】】
2017-12-10 17:15:02.708 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用戶註冊消費者【節點2】獲取消息,用戶編號:25
2017-12-10 17:15:02.717 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用戶註冊消費者【節點2】獲取消息,用戶編號:23
2017-12-10 17:15:02.719 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用戶註冊消費者【節點2】獲取消息,用戶編號:24
2017-12-10 17:15:02.727 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用戶註冊消費者【節點2】獲取消息,用戶編號:27
2017-12-10 17:15:02.790 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用戶註冊消費者【節點2】獲取消息,用戶編號:29
複製代碼
一樣得到了5
條用戶註冊消息,不過並無任何規律可言,編號也不是順序的。
因此多節點時消息具體分發到哪一個節點並非固定的,徹底是
RabbitMQ
分發機制來控制。
本章完成了基於SpringBoot
平臺整合RabbitMQ
單個Provider
對應綁定多個Consumer
來進行多節點分佈式消費者消息消費,實際生產項目部署時徹底能夠將消費節點分開到不一樣的服務器,只要消費節點能夠訪問到RabbitMQ
服務端,能夠正常通信,就能夠完成消息消費。
本章源碼已經上傳到碼雲: SpringBoot配套源碼地址:gitee.com/hengboy/spr… SpringCloud配套源碼地址:gitee.com/hengboy/spr… SpringBoot相關係列文章請訪問:目錄:SpringBoot學習目錄 QueryDSL相關係列文章請訪問:QueryDSL通用查詢框架學習目錄 SpringDataJPA相關係列文章請訪問:目錄:SpringDataJPA學習目錄,感謝閱讀! 歡迎加入QQ技術交流羣,共同進步。