這篇是SpringBoot整合消息隊列的第一篇文章,咱們詳細介紹下消息隊列的相關內容。java
MQ
(Message Quene):經過典型的生產者和消費者模型,生產者不斷向消息隊列中產生消息,消費者不斷的從隊列中獲取消息。由於生產者和消費者都是異步的,並且生產者只關心消息的發送,消費者只關心消息的接收,沒有業務邏輯的侵入,輕鬆實現業務解耦。python
場景描述:某商場具備註冊功能,註冊的時候須要發送短信驗證碼。git
傳統的作法是用戶提交信息到用戶服務,用戶服務調用短信服務發送短信,而後給用戶返回響應,這種是同步的處理方式,耗時較長。加入消息隊列後,用戶直接提交信息到用戶服務,將信息寫入消息隊列,直接給用戶返回響應,短信服務從消息隊列中讀取消息進行發送短信。github
場景描述:某商場下單流程。spring
傳統作法是用戶下單,訂單系統去查詢庫存系統,若是庫存系統宕機了,則下單失敗,損失訂單量。加入消息隊列後,用戶下單,訂單系統記錄訂單,將訂單信息寫入消息隊列,下單成功,而後庫存系統恢復正常後去操做數據庫庫存(不考慮庫存爲0的狀況)。這樣訂單系統和庫存系統就達到鬆耦合的目的了數據庫
場景描述:秒殺活動。服務器
流量過大確定會致使響應超時或系統宕機,加入消息隊列,用戶秒殺請求寫入消息隊列,設置消息隊列的長度等屬性,達到消息隊列最大長度後,直接返回秒殺失敗,而後再去消費消息隊列的數據,完成秒殺。微信
RabbitMQ是用Erlang語言編寫的,實現了高級消息隊列協議(AMQP)的消息中間件。網絡
AMQP
:AMQP
是一種連接協議,直接定義網絡交換的數據格式,這使得實現了AMQP
的provider
自己就是跨平臺的。如下是AMQP
協議模型:app
在上圖中:
在上圖中:
當消息處理比較耗時時,就會出現生產消息的速度遠遠大於消費消息的速度,這樣就會出現消息堆積,沒法及時處理。這時就可讓多個消費者綁定一個隊列,去消費消息,隊列中的消息一旦消費就會丟失,所以任務不會重複執行。
這種模型中生產者發送的消息全部消費者均可以消費。
在上圖中:
這種模型消費者發送的消息,不一樣類型的消息能夠由不一樣的消費者去消費。
在上圖中:
這種模型和direct模型同樣,都是能夠根據routing key將消息路由到不一樣的隊列,只不過這種模型可讓隊列綁定routing key 的時候使用通配符。這種類型的routing key都是由一個或多個單詞組成,多個單詞之間用.
分割。
通配符介紹:
*
:只匹配一個單詞
#
:匹配一個或多個單詞
這種模式須要通知遠程計算機運行功能並等待返回運行結果。這個過程是阻塞的。
當客戶端啓動時,它建立一個匿名獨佔回調隊列。並提供名字爲call的函數,這個call會發送RPC請求而且阻塞直到收到RPC運算的結果。
第一步:引入pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:增長RabbitMQ服務配置信息
spring: rabbitmq: virtual-host: javatrip port: 5672 host: 127.0.0.1 username: guest password: guest
這裏咱們用廣播模型來舉例使用,廣播模型(fanout)比較好理解,就像公衆號同樣,我天天推文章後,會推送給每一個關注用戶,他們均可以看到這條消息。
廣播模型注意點:
@Configuration public class RabbitConfig { final static String queueNameA = "first-queue"; final static String queueNameB = "second-queue"; /*** * 定義一個隊列,設置隊列屬性 * @return */ @Bean("queueA") public Queue queueA(){ Map<String,Object> map = new HashMap<>(); // 消息過時時長,10秒過時 map.put("x-message-ttl",10000); // 隊列中最大消息條數,10條 map.put("x-max-length",10); // 第一個參數,隊列名稱 // 第二個參數,durable:持久化 // 第三個參數,exclusive:排外的, // 第四個參數,autoDelete:自動刪除 Queue queue = new Queue(queueNameA,true,false,false,map); return queue; } @Bean("queueB") public Queue queueB(){ Map<String,Object> map = new HashMap<>(); // 消息過時時長,10秒過時 map.put("x-message-ttl",10000); // 隊列中最大消息條數,10條 map.put("x-max-length",10); // 第一個參數,隊列名稱 // 第二個參數,durable:持久化 // 第三個參數,exclusive:排外的, // 第四個參數,autoDelete:自動刪除 Queue queue = new Queue(queueNameB,true,false,false,map); return queue; } }
@Bean public FanoutExchange fanoutExchange(){ // 第一個參數,交換機名稱 // 第二個參數,durable,是否持久化 // 第三個參數,autoDelete,是否自動刪除 FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false); return fanoutExchange; }
@Bean public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){ Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange); return binding; } @Bean public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){ Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange); return binding; }
@RabbitListener(queues = RabbitConfig.queueNameA) @Component @Slf4j public class ConsumerA { @RabbitHandler public void receive(String message){ log.info("消費者A接收到的消息:"+message); } }
@RabbitListener(queues = RabbitConfig.queueNameB) @Component @Slf4j public class ConsumerB { @RabbitHandler public void receive(String message){ log.info("消費者B接收到的消息:"+message); } }
@RestController public class provider { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("send") public void sendMessage(){ String message = "你好,我是Java旅途"; rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message); } }
這樣生產者發送一條消息後,兩個消費者就能同時消費到消息了。
此是spring-boot-route系列的第十三篇文章,這個系列的文章都比較簡單,主要目的就是爲了幫助初次接觸Spring Boot 的同窗有一個系統的認識。本文已收錄至個人github,歡迎各位小夥伴star
!
github:https://github.com/binzh303/s...
若是以爲文章不錯,歡迎關注、點贊、收藏,大家的支持是我創做的動力,感謝你們。
若是文章寫的有問題,請不要吝嗇,歡迎留言指出,我會及時覈查修改。
若是你還想更加深刻的瞭解我,能夠微信搜索「Java旅途」進行關注。回覆「1024」便可得到學習視頻及精美電子書。天天7:30準時推送技術文章,讓你的上班路不在孤獨,並且每個月還有送書活動,助你提高硬實力!