springboot集成rabbitmq實戰

實戰前言
RabbitMQ 做爲目前應用至關普遍的消息中間件,在企業級應用、微服務應用中充當着重要的角色。特別是在一些典型的應用場景以及業務模塊中具備重要的做用,好比業務服務模塊解耦、異步通訊、高併發限流、超時業務、數據延遲處理等。html

其中課程的學習連接地址:https://edu.csdn.net/course/detail/9314 spring

RabbitMQ 官網拜讀
首先,讓咱們先拜讀 RabbitMQ 官網的技術開發手冊以及相關的 Features,感興趣的朋友能夠耐心的閱讀其中的相關介紹,相信會有必定的收穫,地址可見:數據庫

http://www.rabbitmq.com/getstarted.html後端

閱讀該手冊過程當中,咱們能夠得知 RabbitMQ 其實核心就是圍繞 「消息模型」 來展開的,其中就包括了組成消息模型的相關組件:生產者,消費者,隊列,交換機,路由,消息等!而咱們在實戰應用中,實際上也是牢牢圍繞着 「消息模型」 來展開擼碼的!緩存

下面,我就介紹一下這一消息模型的演變歷程,固然,這一歷程在 RabbitMQ 官網也是能夠窺覽獲得的!服務器

上面幾個圖就已經概述了幾個要點,並且,這幾個要點的含義能夠說是字如其名!多線程

生產者:發送消息的程序
消費者:監聽接收消費消息的程序
消息:一串二進制數據流
隊列:消息的暫存區/存儲區
交換機:消息的中轉站,用於接收分發消息。其中有 fanout、direct、topic、headers 四種
路由:至關於密鑰/第三者,與交換機綁定便可路由消息到指定的隊列!
正如上圖所展現的消息模型的演變,接下來咱們將以代碼的形式實戰各類典型的業務場景!併發

SpringBoot 整合 RabbitMQ 實戰
工欲善其事,必先利其器。咱們首先須要藉助 IDEA 的 Spring Initializr 用 Maven 構建一個 SpringBoot 的項目,並引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依賴。搭建完成以後,能夠簡單的寫個 RabbitMQController 測試一下項目是否搭建是否成功(能夠暫時用單模塊方式構建)app

緊接着,咱們進入實戰的核心階段,在項目或者服務中使用 RabbitMQ,其實無非是有幾個核心要點要緊緊把握住,這幾個核心要點在擼碼過程當中須要「時刻的遊蕩在本身的腦海裏」,其中包括:負載均衡

我要發送的消息是什麼
我應該須要建立什麼樣的消息模型:DirectExchange+RoutingKey?TopicExchange+RoutingKey?等
我要處理的消息是實時的仍是須要延時/延遲的?
消息的生產者須要在哪裏寫,消息的監聽消費者須要在哪裏寫,各自的處理邏輯是啥
基於這樣的幾個要點,咱們先小試牛刀一番,採用 RabbitMQ 實戰異步寫日誌與異步發郵件。固然啦,在進行實戰前,咱們須要安裝好 RabbitMQ 及其後端控制檯應用,並在項目中配置一下 RabbitMQ 的相關參數以及相關 Bean 組件。

RabbitMQ 安裝完成後,打開後端控制檯應用:http://localhost:15672  輸入guest guest 登陸,看到下圖即表示安裝成功

而後是項目配置文件層面的配置 application.properties

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5
其中,後面三個參數主要是用於「併發量的配置」,表示:併發消費者的初始化值,併發消費者的最大值,每一個消費者每次監聽時可拉取處理的消息數量。

接下來,咱們須要以 Configuration 的方式配置 RabbitMQ 並以 Bean 的方式顯示注入 RabbitMQ 在發送接收處理消息時相關 Bean 組件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充當消息的發送組件,後者是用於管理  RabbitMQ監聽器listener 的容器工廠,其代碼以下:

@Configuration
    public class RabbitmqConfig {
    private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
 
    @Autowired
    private Environment env;
 
    @Autowired
    private CachingConnectionFactory connectionFactory;
 
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
 
    /**
     * 單一消費者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
 
    /**
     * 多個消費者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
        return factory;
    }
 
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }}
RabbitMQ 實戰:業務模塊解耦以及異步通訊
在一些企業級系統中,咱們常常能夠見到一個執行 function 一般是由許多子模塊組成的,這個 function 在執行過程當中,須要 同步的將其代碼從頭開始執行到尾,即執行流程是 module_A -> module_B -> module_C -> module_D,典型的案例能夠參見彙編或者 C 語言等面向過程語言開發的應用,如今的一些 JavaWeb 應用也存在着這樣的寫法。

而咱們知道,這個執行流程其實對於整個 function 來說是有必定的弊端的,主要有幾點:

整個 function 的執行響應時間將好久;
若是某個 module 發生異常而沒有處理得當,可能會影響其餘 module 甚至整個 function 的執行流程與結果;
整個 function 中代碼可能會很冗長,模塊與模塊之間可能須要進行強通訊以及數據的交互,出現問題時難以定位與維護,甚至會陷入 「改一處代碼而動全身」的尷尬境地!
故而,咱們須要想辦法進行優化,咱們須要將強關聯的業務模塊解耦以及某些模塊之間實行異步通訊!下面就以兩個場景來實戰咱們的優化措施!

場景一:異步記錄用戶操做日誌

對於企業級應用系統或者微服務應用中,咱們常常須要追溯跟蹤記錄用戶的操做日誌,而這部分的業務在某種程度上是不該該跟主業務模塊耦合在一塊兒的,故而咱們須要將其單獨抽出並以異步的方式與主模塊進行異步通訊交互數據。

下面咱們就用 RabbitMQ 的 DirectExchange+RoutingKey 消息模型也實現「用戶登陸成功記錄日誌」的場景。如前面所言,咱們須要在腦海裏迴盪着幾個要點:

消息模型:DirectExchange+RoutingKey 消息模型
消息:用戶登陸的實體信息,包括用戶名,登陸事件,來源的IP,所屬日誌模塊等信息
發送接收:在登陸的 Controller 中實現發送,在某個 listener 中實現接收並將監聽消費到的消息入數據表;實時發送接收
首先咱們須要在上面的 RabbitmqConfig 類中建立消息模型:包括 Queue、Exchange、RoutingKey 等的創建,代碼以下:

上圖中 env 獲取的信息,咱們須要在 application.properties 進行配置,其中 mq.env=local

此時,咱們將整個項目/服務跑起來,並打開 RabbitMQ 後端控制檯應用,便可看到隊列以及交換機及其綁定已經創建好了,以下所示:

接下來,咱們須要在 Controller 中執行用戶登陸邏輯,記錄用戶登陸日誌,查詢獲取用戶角色視野資源信息等,因爲篇幅關係,在這裏咱們重點要實現的是用MQ實現 「異步記錄用戶登陸日誌」 的邏輯,即在這裏 Controller 將充當「生產者」的角色,核心代碼以下:

@RestController
    public class UserController {
 
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
 
    private static final String Prefix="user";
 
    @Autowired
    private ObjectMapper objectMapper;
 
    @Autowired
    private UserMapper userMapper;
 
    @Autowired
    private UserLogMapper userLogMapper;
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    private Environment env;
 
    @RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            //TODO:執行登陸邏輯
            User user=userMapper.selectByUserNamePassword(userName,password);
            if (user!=null){
                //TODO:異步寫用戶日誌
                try {
                    UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));
                    userLog.setCreateTime(new Date());
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));
                    rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));
 
                    Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                    message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON); 
                    rabbitTemplate.convertAndSend(message);         
                }catch (Exception e){
                    e.printStackTrace();
                }
 
                //TODO:塞權限數據-資源數據-視野數據
            }else{
                response=new BaseResponse(StatusCode.Fail);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return response;
    }}
在上面的「發送邏輯」代碼中,其實也體現了咱們最開始介紹的演進中的幾種消息模型,好比咱們是將消息發送到 Exchange 的而不是 Queue,消息是以二進制流的形式進行傳輸等等。當用 postman 請求到這個 controller 的方法時,咱們能夠在 RabbitMQ 的後端控制檯應用看到一條未確認的消息,經過 GetMessage 便可看到其中的詳情,以下:

最後,咱們將開發消費端的業務代碼,以下:

 @Component
    public class CommonMqListener {
 
    private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);
 
    @Autowired
    private ObjectMapper objectMapper;
 
    @Autowired
    private UserLogMapper userLogMapper;
 
    @Autowired
    private MailService mailService;
 
    /**
     * 監聽消費用戶日誌
     * @param message
     */
    @RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeUserLogQueue(@Payload byte[] message){
        try {
            UserLog userLog=objectMapper.readValue(message, UserLog.class);
            log.info("監聽消費用戶日誌 監聽到消息: {} ",userLog);
            //TODO:記錄日誌入數據表
            userLogMapper.insertSelective(userLog);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
將服務跑起來以後,咱們便可監聽消費到上面 Queue 中的消息,即當前用戶登陸的信息,並且,咱們也能夠看到「記錄用戶登陸日誌」的邏輯是由一條異於主業務線程的異步線程去執行的:

「異步記錄用戶操做日誌」的案例我想足以用於詮釋上面所講的相關理論知識點了,在後續篇章中,因爲篇幅限制,我將重點介紹其核心的業務邏輯!

場景二:異步發送郵件

發送郵件的場景,其實也是比較常見的,好比用戶註冊須要郵箱驗證,用戶異地登陸發送郵件通知等等,在這裏我以 RabbitMQ 實現異步發送郵件。實現的步驟跟場景一幾乎一致!

1. 消息模型的建立

2. 配置信息的建立

3. 生產端

4. 消費端

RabbitMQ 實戰:併發量配置與消息確認機制
實戰背景

對於消息模型中的 listener 而言,默認狀況下是「單消費實例」的配置,即「一個 listener 對應一個消費者」,這種配置對於上面所講的「異步記錄用戶操做日誌」、「異步發送郵件」等併發量不高的場景下是適用的。可是在對於秒殺系統、商城搶單等場景下可能會顯得很吃力!

咱們都知道,秒殺系統跟商城搶單均有一個共同的明顯的特徵,即在某個時刻會有成百上千萬的請求到達咱們的接口,即瞬間這股巨大的流量將涌入咱們的系統,咱們能夠採用下面一圖來大體體現這一現象:

當到了「開始秒殺」、「開始搶單」的時刻,此時系統可能會出現這樣的幾種現象:

應用系統配置承載不了這股瞬間流量,致使系統直接掛掉,即傳說中的「宕機」現象;
接口邏輯沒有考慮併發狀況,數據庫讀寫鎖發生衝突,致使最終處理結果跟理論上的結果數據不一致(如商品存庫量只有 100,可是高併發狀況下,實際表記錄的搶到的用戶記錄數據量卻遠遠大於 100);
應用佔據服務器的資源直接飆高,如 CPU、內存、寬帶等瞬間直接飆升,致使同庫同表甚至可能同 host 的其餘服務或者系統出現卡頓或者掛掉的現象;
因而乎,咱們須要尋找解決方案!對於目前來說,網上均有諸多比較不錯的解決方案,在此我順便提一下咱們的應用系統採用的經常使用解決方案,包括:

咱們會將處理搶單的總體業務邏輯獨立、服務化並作集羣部署;
咱們會將那股巨大的流量拒在系統的上層,即將其轉移至 MQ 而不直接涌入咱們的接口,從而減小數據庫讀寫鎖衝突的發生以及因爲接口邏輯的複雜出現線程堵塞而致使應用佔據服務器資源飆升;
咱們會將搶單業務所在系統的其餘同數據源甚至同表的業務拆分獨立出去服務化,並基於某種 RPC 協議走 HTTP 通訊進行數據交互、服務通訊等等;
採用分佈式鎖解決同一時間同個手機號、同一時間同個 IP 刷單的現象;
下面,咱們用 RabbitMQ 來實戰上述的第二點!即咱們會在「請求」 -> "處理搶單業務的接口" 中間架一層消息中間件作「緩衝」、「緩壓」處理,以下圖所示:

併發量配置與消息確認機制

正如上面所講的,對於搶單、秒殺等高併發系統而言,若是咱們須要用 RabbitMQ 在 「請求」 - 「接口」 之間充當限流緩壓的角色,那便須要咱們對 RabbitMQ 提出更高的要求,即支持高併發的配置,在這裏咱們須要明確一點,「併發消費者」的配置實際上是針對 listener 而言,當配置成功後,咱們能夠在 MQ 的後端控制檯應用看到 consumers 的數量,以下所示:

其中,這個 listener 在這裏有 10 個 consumer 實例的配置,每一個 consumer 能夠預監聽消費拉取的消息數量爲 5 個(若是同一時間處理不完,會將其緩存在 mq 的客戶端等待處理!)

另外,對於某些消息而言,咱們有時候須要嚴格的知道消息是否已經被 consumer 監聽消費處理了,即咱們有一種消息確認機制來保證咱們的消息是否已經真正的被消費處理。在 RabbitMQ 中,消息確認處理機制有三種:Auto - 自動、Manual - 手動、None - 無需確認,而確認機制須要 listener 實現 ChannelAwareMessageListener 接口,並重寫其中的確認消費邏輯。在這裏咱們將用 「手動確認」 的機制來實戰用戶商城搶單場景。

1.在 RabbitMQConfig 中配置確認消費機制以及併發量的配置

2.消息模型的配置信息

3.RabbitMQ 後端控制檯應用查看此隊列的併發量配置

4.listener 確認消費處理邏輯:在這裏咱們須要開發搶單的業務邏輯,即「只有當該商品的庫存 >0 時,搶單成功,扣減庫存量,並將該搶單的用戶信息記錄入表,異步通知用戶搶單成功!」

緊接着咱們採用 CountDownLatch 模擬產生高併發時的多線程請求(或者採用 jmeter 實施壓測也能夠!),每一個請求將攜帶產生的隨機數:充當手機號 -> 充當消息,最終入搶單隊列!在這裏,我模擬了 50000 個請求,至關於 50000 手機號同一時間發生搶單的請求,而設置的產品庫存量爲 100,這在 product 數據庫表便可設置

6.將搶單請求的手機號信息壓入隊列,等待排隊處理

7.在最後咱們寫個 Junit 或者寫個 Controller,進行 initService.generateMultiThread(); 調用模擬產生高併發的搶單請求便可

@RestController
    public class ConcurrencyController {
 
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
 
    private static final String Prefix="concurrency";
 
    @Autowired
    private InitService initService;
 
    @RequestMapping(value = Prefix+"/robbing/thread",method = RequestMethod.GET)
    public BaseResponse robbingThread(){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        initService.generateMultiThread();
        return response;
    }}
8.最後,咱們固然是跑起來,在控制檯咱們能夠觀察到系統不斷的在產生新的請求(線程)– 至關於不斷的有搶單的手機號涌入咱們的系統,而後入隊列,listener 監聽到請求以後消費處理搶單邏輯!最後咱們能夠觀察兩張數據庫表:商品庫存表、商品成功搶單的用戶記錄表 - 只有當庫存表中商品對應的庫存量爲 0、商品成功搶單的用戶記錄恰好 100 時 即表示咱們的實戰目的以及效果已經達到了!!

總結:如此一來,咱們便將 request 轉移到咱們的 mq,在必定程度緩解了咱們的應用以及接口的壓力!固然,實際狀況下,咱們的配置可能遠遠不僅代碼層次上的配置,好比咱們的 mq 可能會作集羣配置、負載均衡、商品庫存的更新可能會考慮分庫分表、庫存更新可能會考慮獨立爲庫存 Dubbo 服務並經過 Rest Api 異步通訊交互並獨立部署等等。這些優化以及改進的目的其實無非是爲了能限流、緩壓、保證系統穩定、數據的一致等!而咱們的 MQ,在其中能夠起到不可磨滅的做用,其字如其名:「消息隊列」,而隊列具備 「先進先出」 的特色,故而全部進入 MQ 的消息都將 「乖巧」 的在 MQ 上排好隊,先來先排隊,先來先被處理消費,由此一來至少能夠避免 「瞬間時刻一窩蜂的 request 涌入咱們的接口」 的狀況!

附註:在用 RabbitMQ 實戰上述高併發搶單解決方案,其實我也在數據庫層面進行了優化,即在讀寫存庫時採用了「相似樂觀鎖」的寫法,保證:搶單的請求到來時有庫存,更新存庫時保證有庫存能夠被更新!

相關文章
相關標籤/搜索